alamb commented on code in PR #10711:
URL: https://github.com/apache/datafusion/pull/10711#discussion_r1621096433


##########
datafusion/core/src/datasource/physical_plan/parquet/statistics.rs:
##########
@@ -556,8 +885,8 @@ mod test {
                 None,
                 None,
             ]),
-            expected_min: timestamp_array([Some(1), Some(5), None]),
-            expected_max: timestamp_array([Some(3), Some(9), None]),
+            expected_min: i64_array([Some(1), Some(5), None]),

Review Comment:
   I see this is consistent with the existing code 👍 



##########
datafusion/core/src/datasource/physical_plan/parquet/statistics.rs:
##########
@@ -211,19 +323,239 @@ pub(crate) fn min_statistics<'a, I: Iterator<Item = 
Option<&'a ParquetStatistics
     data_type: &DataType,
     iterator: I,
 ) -> Result<ArrayRef> {
-    let scalars = iterator
-        .map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes, 
Some(data_type))));
-    collect_scalars(data_type, scalars)
+    match data_type {
+        DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
+            MinBooleanStatsIterator::new(iterator).map(|x| x.copied()),
+        ))),
+        DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
+            MinInt32StatsIterator::new(iterator).map(|x| {
+                x.and_then(|x| {
+                    if let Ok(v) = i8::try_from(*x) {
+                        Some(v)
+                    } else {
+                        None
+                    }
+                })
+            }),
+        ))),
+        DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
+            MinInt32StatsIterator::new(iterator).map(|x| {
+                x.and_then(|x| {
+                    if let Ok(v) = i16::try_from(*x) {
+                        Some(v)
+                    } else {
+                        None
+                    }
+                })
+            }),
+        ))),
+        DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
+            MinInt32StatsIterator::new(iterator).map(|x| x.copied()),
+        ))),
+        DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(
+            MinInt64StatsIterator::new(iterator).map(|x| x.copied()),
+        ))),
+        DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
+            MinInt32StatsIterator::new(iterator).map(|x| {
+                x.and_then(|x| {
+                    if let Ok(v) = u8::try_from(*x) {
+                        Some(v)
+                    } else {
+                        None
+                    }
+                })
+            }),
+        ))),
+        DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
+            MinInt32StatsIterator::new(iterator).map(|x| {
+                x.and_then(|x| {
+                    if let Ok(v) = u16::try_from(*x) {
+                        Some(v)
+                    } else {
+                        None
+                    }
+                })
+            }),
+        ))),
+        DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
+            MinInt32StatsIterator::new(iterator).map(|x| x.map(|x| *x as u32)),
+        ))),
+        DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
+            MinInt64StatsIterator::new(iterator).map(|x| x.map(|x| *x as u64)),
+        ))),
+        DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
+            MinFloatStatsIterator::new(iterator).map(|x| x.copied()),
+        ))),
+        DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
+            MinDoubleStatsIterator::new(iterator).map(|x| x.copied()),
+        ))),
+        DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
+            MinInt32StatsIterator::new(iterator).map(|x| x.copied()),
+        ))),
+        DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(
+            MinInt32StatsIterator::new(iterator)
+                .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000)),
+        ))),
+        DataType::Timestamp(_, _) => Ok(Arc::new(Int64Array::from_iter(
+            MinInt64StatsIterator::new(iterator).map(|x| x.copied()),
+        ))),
+        DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
+            MinByteArrayStatsIterator::new(iterator).map(|x| x.map(|x| 
x.to_vec())),
+        ))),
+        DataType::Utf8 => Ok(Arc::new(StringArray::from_iter(
+            MinByteArrayStatsIterator::new(iterator).map(|x| {
+                x.and_then(|x| {
+                    let res = std::str::from_utf8(x).map(|s| 
s.to_string()).ok();
+                    if res.is_none() {
+                        log::debug!("Utf8 statistics is a non-UTF8 value, 
ignoring it.");
+                    }
+                    res
+                })
+            }),
+        ))),
+        DataType::FixedSizeBinary(size) => 
Ok(Arc::new(FixedSizeBinaryArray::from(
+            MinFixedLenByteArrayStatsIterator::new(iterator).map(|x| {
+                x.and_then(|x| {
+                    if x.len().try_into() == Ok(*size) {
+                        Some(x)
+                    } else {
+                        log::debug!(
+                            "FixedSizeBinary({}) statistics is a binary of 
size {}, ignoring it.",
+                            size,
+                            x.len(),
+                        );
+                        None
+                    }
+                })
+            }).collect::<Vec<_>>(),
+        ))),
+        _ => {
+            let scalars = iterator.map(|x| {
+                x.and_then(|s| get_statistic!(s, min, min_bytes, 
Some(data_type)))
+            });
+            collect_scalars(data_type, scalars)
+        }
+    }
 }
 
 /// Extracts the max statistics from an iterator of [`ParquetStatistics`] to 
an [`ArrayRef`]
 pub(crate) fn max_statistics<'a, I: Iterator<Item = Option<&'a 
ParquetStatistics>>>(
     data_type: &DataType,
     iterator: I,
 ) -> Result<ArrayRef> {
-    let scalars = iterator
-        .map(|x| x.and_then(|s| get_statistic!(s, max, max_bytes, 
Some(data_type))));
-    collect_scalars(data_type, scalars)
+    match data_type {
+        DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
+            MaxBooleanStatsIterator::new(iterator).map(|x| x.copied()),
+        ))),
+        DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
+            MaxInt32StatsIterator::new(iterator).map(|x| {
+                x.and_then(|x| {
+                    if let Ok(v) = i8::try_from(*x) {
+                        Some(v)
+                    } else {
+                        None
+                    }
+                })
+            }),
+        ))),
+        DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
+            MaxInt32StatsIterator::new(iterator).map(|x| {
+                x.and_then(|x| {
+                    if let Ok(v) = i16::try_from(*x) {
+                        Some(v)
+                    } else {
+                        None
+                    }
+                })
+            }),
+        ))),
+        DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
+            MaxInt32StatsIterator::new(iterator).map(|x| x.copied()),
+        ))),
+        DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(
+            MaxInt64StatsIterator::new(iterator).map(|x| x.copied()),
+        ))),
+        DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
+            MaxInt32StatsIterator::new(iterator).map(|x| {
+                x.and_then(|x| {
+                    if let Ok(v) = u8::try_from(*x) {
+                        Some(v)
+                    } else {
+                        None
+                    }
+                })
+            }),
+        ))),
+        DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
+            MaxInt32StatsIterator::new(iterator).map(|x| {
+                x.and_then(|x| {
+                    if let Ok(v) = u16::try_from(*x) {
+                        Some(v)
+                    } else {
+                        None
+                    }
+                })
+            }),
+        ))),
+        DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
+            MaxInt32StatsIterator::new(iterator).map(|x| x.map(|x| *x as u32)),
+        ))),
+        DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
+            MaxInt64StatsIterator::new(iterator).map(|x| x.map(|x| *x as u64)),
+        ))),
+        DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
+            MaxFloatStatsIterator::new(iterator).map(|x| x.copied()),
+        ))),
+        DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
+            MaxDoubleStatsIterator::new(iterator).map(|x| x.copied()),
+        ))),
+        DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
+            MaxInt32StatsIterator::new(iterator).map(|x| x.copied()),
+        ))),
+        DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(
+            MaxInt32StatsIterator::new(iterator)
+                .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000)),
+        ))),
+        DataType::Timestamp(_, _) => Ok(Arc::new(Int64Array::from_iter(

Review Comment:
   It is consistent with the existing code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to