This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 06bbe1298f Extract parquet statistics to its own module, add tests 
(#8294)
06bbe1298f is described below

commit 06bbe1298fa8aa042b6a6462e55b2890969d884a
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Nov 29 14:10:06 2023 -0500

    Extract parquet statistics to its own module, add tests (#8294)
    
    * Extract parquet statistics to its own module, add tests
    
    * Update datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
    
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
    
    * rename enum
    
    * Improve API
    
    * Add test for reading struct array statistics
    
    * Add test for column after statistics
    
    * improve tests
    
    * simplify
    
    * clippy
    
    * Update datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
    
    * Update datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
    
    * Add test showing incorrect statistics
    
    * Rework statistics
    
    * Fix clippy
    
    * Update documentation and make it clear the statistics are not publically 
accessable
    
    * Add link to upstream arrow ticket
    
    ---------
    
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
    Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
 .../src/datasource/physical_plan/parquet/mod.rs    |  24 +-
 .../physical_plan/parquet/page_filter.rs           |   5 +-
 .../datasource/physical_plan/parquet/row_groups.rs | 189 ++---
 .../datasource/physical_plan/parquet/statistics.rs | 899 +++++++++++++++++++++
 4 files changed, 951 insertions(+), 166 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 731672ceb8..95aae71c77 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -66,6 +66,7 @@ mod metrics;
 pub mod page_filter;
 mod row_filter;
 mod row_groups;
+mod statistics;
 
 pub use metrics::ParquetFileMetrics;
 
@@ -506,6 +507,7 @@ impl FileOpener for ParquetOpener {
             let file_metadata = builder.metadata().clone();
             let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
             let mut row_groups = row_groups::prune_row_groups_by_statistics(
+                builder.parquet_schema(),
                 file_metadata.row_groups(),
                 file_range,
                 predicate,
@@ -718,28 +720,6 @@ pub async fn plan_to_parquet(
     Ok(())
 }
 
-// Copy from the arrow-rs
-// 
https://github.com/apache/arrow-rs/blob/733b7e7fd1e8c43a404c3ce40ecf741d493c21b4/parquet/src/arrow/buffer/bit_util.rs#L55
-// Convert the byte slice to fixed length byte array with the length of 16
-fn sign_extend_be(b: &[u8]) -> [u8; 16] {
-    assert!(b.len() <= 16, "Array too large, expected less than 16");
-    let is_negative = (b[0] & 128u8) == 128u8;
-    let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] };
-    for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) {
-        *d = *s;
-    }
-    result
-}
-
-// Convert the bytes array to i128.
-// The endian of the input bytes array must be big-endian.
-pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
-    // The bytes array are from parquet file and must be the big-endian.
-    // The endian is defined by parquet format, and the reference document
-    // 
https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
-    i128::from_be_bytes(sign_extend_be(b))
-}
-
 // Convert parquet column schema to arrow data type, and just consider the
 // decimal data type.
 pub(crate) fn parquet_to_arrow_decimal_type(
diff --git 
a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
index b5b5f154f7..42bfef3599 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
@@ -39,9 +39,8 @@ use parquet::{
 };
 use std::sync::Arc;
 
-use crate::datasource::physical_plan::parquet::{
-    from_bytes_to_i128, parquet_to_arrow_decimal_type,
-};
+use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
+use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128;
 use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
 
 use super::metrics::ParquetFileMetrics;
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 0079368f9c..0ab2046097 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -15,25 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::{
-    array::ArrayRef,
-    datatypes::{DataType, Schema},
-};
+use arrow::{array::ArrayRef, datatypes::Schema};
+use arrow_schema::FieldRef;
 use datafusion_common::tree_node::{TreeNode, VisitRecursion};
 use datafusion_common::{Column, DataFusionError, Result, ScalarValue};
+use parquet::file::metadata::ColumnChunkMetaData;
+use parquet::schema::types::SchemaDescriptor;
 use parquet::{
     arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder},
     bloom_filter::Sbbf,
-    file::{metadata::RowGroupMetaData, statistics::Statistics as 
ParquetStatistics},
+    file::metadata::RowGroupMetaData,
 };
 use std::{
     collections::{HashMap, HashSet},
     sync::Arc,
 };
 
-use crate::datasource::{
-    listing::FileRange,
-    physical_plan::parquet::{from_bytes_to_i128, 
parquet_to_arrow_decimal_type},
+use crate::datasource::listing::FileRange;
+use crate::datasource::physical_plan::parquet::statistics::{
+    max_statistics, min_statistics, parquet_column,
 };
 use crate::logical_expr::Operator;
 use crate::physical_expr::expressions as phys_expr;
@@ -51,7 +51,11 @@ use super::ParquetFileMetrics;
 ///
 /// If an index IS present in the returned Vec it means the predicate
 /// did not filter out that row group.
+///
+/// Note: This method currently ignores ColumnOrder
+/// <https://github.com/apache/arrow-datafusion/issues/8335>
 pub(crate) fn prune_row_groups_by_statistics(
+    parquet_schema: &SchemaDescriptor,
     groups: &[RowGroupMetaData],
     range: Option<FileRange>,
     predicate: Option<&PruningPredicate>,
@@ -74,8 +78,9 @@ pub(crate) fn prune_row_groups_by_statistics(
 
         if let Some(predicate) = predicate {
             let pruning_stats = RowGroupPruningStatistics {
+                parquet_schema,
                 row_group_metadata: metadata,
-                parquet_schema: predicate.schema().as_ref(),
+                arrow_schema: predicate.schema().as_ref(),
             };
             match predicate.prune(&pruning_stats) {
                 Ok(values) => {
@@ -296,146 +301,33 @@ impl BloomFilterPruningPredicate {
     }
 }
 
-/// Wraps parquet statistics in a way
-/// that implements [`PruningStatistics`]
+/// Wraps [`RowGroupMetaData`] in a way that implements [`PruningStatistics`]
+///
+/// Note: This should be implemented for an array of [`RowGroupMetaData`] 
instead
+/// of per row-group
 struct RowGroupPruningStatistics<'a> {
+    parquet_schema: &'a SchemaDescriptor,
     row_group_metadata: &'a RowGroupMetaData,
-    parquet_schema: &'a Schema,
+    arrow_schema: &'a Schema,
 }
 
-/// Extract the min/max statistics from a `ParquetStatistics` object
-macro_rules! get_statistic {
-    ($column_statistics:expr, $func:ident, $bytes_func:ident, 
$target_arrow_type:expr) => {{
-        if !$column_statistics.has_min_max_set() {
-            return None;
-        }
-        match $column_statistics {
-            ParquetStatistics::Boolean(s) => 
Some(ScalarValue::Boolean(Some(*s.$func()))),
-            ParquetStatistics::Int32(s) => {
-                match $target_arrow_type {
-                    // int32 to decimal with the precision and scale
-                    Some(DataType::Decimal128(precision, scale)) => {
-                        Some(ScalarValue::Decimal128(
-                            Some(*s.$func() as i128),
-                            precision,
-                            scale,
-                        ))
-                    }
-                    _ => Some(ScalarValue::Int32(Some(*s.$func()))),
-                }
-            }
-            ParquetStatistics::Int64(s) => {
-                match $target_arrow_type {
-                    // int64 to decimal with the precision and scale
-                    Some(DataType::Decimal128(precision, scale)) => {
-                        Some(ScalarValue::Decimal128(
-                            Some(*s.$func() as i128),
-                            precision,
-                            scale,
-                        ))
-                    }
-                    _ => Some(ScalarValue::Int64(Some(*s.$func()))),
-                }
-            }
-            // 96 bit ints not supported
-            ParquetStatistics::Int96(_) => None,
-            ParquetStatistics::Float(s) => 
Some(ScalarValue::Float32(Some(*s.$func()))),
-            ParquetStatistics::Double(s) => 
Some(ScalarValue::Float64(Some(*s.$func()))),
-            ParquetStatistics::ByteArray(s) => {
-                match $target_arrow_type {
-                    // decimal data type
-                    Some(DataType::Decimal128(precision, scale)) => {
-                        Some(ScalarValue::Decimal128(
-                            Some(from_bytes_to_i128(s.$bytes_func())),
-                            precision,
-                            scale,
-                        ))
-                    }
-                    _ => {
-                        let s = std::str::from_utf8(s.$bytes_func())
-                            .map(|s| s.to_string())
-                            .ok();
-                        Some(ScalarValue::Utf8(s))
-                    }
-                }
-            }
-            // type not supported yet
-            ParquetStatistics::FixedLenByteArray(s) => {
-                match $target_arrow_type {
-                    // just support the decimal data type
-                    Some(DataType::Decimal128(precision, scale)) => {
-                        Some(ScalarValue::Decimal128(
-                            Some(from_bytes_to_i128(s.$bytes_func())),
-                            precision,
-                            scale,
-                        ))
-                    }
-                    _ => None,
-                }
-            }
-        }
-    }};
-}
-
-// Extract the min or max value calling `func` or `bytes_func` on the 
ParquetStatistics as appropriate
-macro_rules! get_min_max_values {
-    ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{
-        let (_column_index, field) =
-            if let Some((v, f)) = 
$self.parquet_schema.column_with_name(&$column.name) {
-                (v, f)
-            } else {
-                // Named column was not present
-                return None;
-            };
-
-        let data_type = field.data_type();
-        // The result may be None, because DataFusion doesn't have support for 
ScalarValues of the column type
-        let null_scalar: ScalarValue = data_type.try_into().ok()?;
-
-        $self.row_group_metadata
-            .columns()
-            .iter()
-            .find(|c| c.column_descr().name() == &$column.name)
-            .and_then(|c| if c.statistics().is_some() 
{Some((c.statistics().unwrap(), c.column_descr()))} else {None})
-            .map(|(stats, column_descr)|
-                {
-                    let target_data_type = 
parquet_to_arrow_decimal_type(column_descr);
-                    get_statistic!(stats, $func, $bytes_func, target_data_type)
-                })
-            .flatten()
-            // column either didn't have statistics at all or didn't have 
min/max values
-            .or_else(|| Some(null_scalar.clone()))
-            .and_then(|s| s.to_array().ok())
-    }}
-}
-
-// Extract the null count value on the ParquetStatistics
-macro_rules! get_null_count_values {
-    ($self:expr, $column:expr) => {{
-        let value = ScalarValue::UInt64(
-            if let Some(col) = $self
-                .row_group_metadata
-                .columns()
-                .iter()
-                .find(|c| c.column_descr().name() == &$column.name)
-            {
-                col.statistics().map(|s| s.null_count())
-            } else {
-                Some($self.row_group_metadata.num_rows() as u64)
-            },
-        );
-
-        value.to_array().ok()
-    }};
+impl<'a> RowGroupPruningStatistics<'a> {
+    /// Lookups up the parquet column by name
+    fn column(&self, name: &str) -> Option<(&ColumnChunkMetaData, &FieldRef)> {
+        let (idx, field) = parquet_column(self.parquet_schema, 
self.arrow_schema, name)?;
+        Some((self.row_group_metadata.column(idx), field))
+    }
 }
 
 impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
     fn min_values(&self, column: &Column) -> Option<ArrayRef> {
-        get_min_max_values!(self, column, min, min_bytes)
+        let (column, field) = self.column(&column.name)?;
+        min_statistics(field.data_type(), 
std::iter::once(column.statistics())).ok()
     }
 
     fn max_values(&self, column: &Column) -> Option<ArrayRef> {
-        get_min_max_values!(self, column, max, max_bytes)
+        let (column, field) = self.column(&column.name)?;
+        max_statistics(field.data_type(), 
std::iter::once(column.statistics())).ok()
     }
 
     fn num_containers(&self) -> usize {
@@ -443,7 +335,9 @@ impl<'a> PruningStatistics for 
RowGroupPruningStatistics<'a> {
     }
 
     fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
-        get_null_count_values!(self, column)
+        let (c, _) = self.column(&column.name)?;
+        let scalar = ScalarValue::UInt64(Some(c.statistics()?.null_count()));
+        scalar.to_array().ok()
     }
 }
 
@@ -463,6 +357,7 @@ mod tests {
     use datafusion_physical_expr::execution_props::ExecutionProps;
     use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
     use datafusion_sql::planner::ContextProvider;
+    use parquet::arrow::arrow_to_parquet_schema;
     use parquet::arrow::async_reader::ParquetObjectReader;
     use parquet::basic::LogicalType;
     use parquet::data_type::{ByteArray, FixedLenByteArray};
@@ -540,6 +435,7 @@ mod tests {
         let metrics = parquet_file_metrics();
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema_descr,
                 &[rgm1, rgm2],
                 None,
                 Some(&pruning_predicate),
@@ -574,6 +470,7 @@ mod tests {
         // is null / undefined so the first row group can't be filtered out
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema_descr,
                 &[rgm1, rgm2],
                 None,
                 Some(&pruning_predicate),
@@ -621,6 +518,7 @@ mod tests {
         // when conditions are joined using AND
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema_descr,
                 groups,
                 None,
                 Some(&pruning_predicate),
@@ -639,6 +537,7 @@ mod tests {
         // this bypasses the entire predicate expression and no row groups are 
filtered out
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema_descr,
                 groups,
                 None,
                 Some(&pruning_predicate),
@@ -678,6 +577,7 @@ mod tests {
             Field::new("c1", DataType::Int32, false),
             Field::new("c2", DataType::Boolean, false),
         ]));
+        let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
         let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
         let expr = logical2physical(&expr, &schema);
         let pruning_predicate = PruningPredicate::try_new(expr, 
schema).unwrap();
@@ -687,6 +587,7 @@ mod tests {
         // First row group was filtered out because it contains no null value 
on "c2".
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema_descr,
                 &groups,
                 None,
                 Some(&pruning_predicate),
@@ -706,6 +607,7 @@ mod tests {
             Field::new("c1", DataType::Int32, false),
             Field::new("c2", DataType::Boolean, false),
         ]));
+        let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
         let expr = col("c1")
             .gt(lit(15))
             .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
@@ -718,6 +620,7 @@ mod tests {
         // pass predicates. Ideally these should both be false
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema_descr,
                 &groups,
                 None,
                 Some(&pruning_predicate),
@@ -776,6 +679,7 @@ mod tests {
         let metrics = parquet_file_metrics();
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema_descr,
                 &[rgm1, rgm2, rgm3],
                 None,
                 Some(&pruning_predicate),
@@ -839,6 +743,7 @@ mod tests {
         let metrics = parquet_file_metrics();
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema_descr,
                 &[rgm1, rgm2, rgm3, rgm4],
                 None,
                 Some(&pruning_predicate),
@@ -886,6 +791,7 @@ mod tests {
         let metrics = parquet_file_metrics();
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema_descr,
                 &[rgm1, rgm2, rgm3],
                 None,
                 Some(&pruning_predicate),
@@ -956,6 +862,7 @@ mod tests {
         let metrics = parquet_file_metrics();
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema_descr,
                 &[rgm1, rgm2, rgm3],
                 None,
                 Some(&pruning_predicate),
@@ -1015,6 +922,7 @@ mod tests {
         let metrics = parquet_file_metrics();
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema_descr,
                 &[rgm1, rgm2, rgm3],
                 None,
                 Some(&pruning_predicate),
@@ -1028,7 +936,6 @@ mod tests {
         schema_descr: &SchemaDescPtr,
         column_statistics: Vec<ParquetStatistics>,
     ) -> RowGroupMetaData {
-        use parquet::file::metadata::ColumnChunkMetaData;
         let mut columns = vec![];
         for (i, s) in column_statistics.iter().enumerate() {
             let column = ColumnChunkMetaData::builder(schema_descr.column(i))
@@ -1046,7 +953,7 @@ mod tests {
     }
 
     fn get_test_schema_descr(fields: Vec<PrimitiveTypeField>) -> SchemaDescPtr 
{
-        use parquet::schema::types::{SchemaDescriptor, Type as SchemaType};
+        use parquet::schema::types::Type as SchemaType;
         let schema_fields = fields
             .iter()
             .map(|field| {
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
new file mode 100644
index 0000000000..4e472606da
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -0,0 +1,899 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`min_statistics`] and [`max_statistics`] convert statistics in parquet 
format to arrow [`ArrayRef`].
+
+// TODO: potentially move this to arrow-rs: 
https://github.com/apache/arrow-rs/issues/4328
+
+use arrow::{array::ArrayRef, datatypes::DataType};
+use arrow_array::new_empty_array;
+use arrow_schema::{FieldRef, Schema};
+use datafusion_common::{Result, ScalarValue};
+use parquet::file::statistics::Statistics as ParquetStatistics;
+use parquet::schema::types::SchemaDescriptor;
+
+// Convert the bytes array to i128.
+// The endian of the input bytes array must be big-endian.
+pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
+    // The bytes array are from parquet file and must be the big-endian.
+    // The endian is defined by parquet format, and the reference document
+    // 
https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
+    i128::from_be_bytes(sign_extend_be(b))
+}
+
+// Copy from arrow-rs
+// 
https://github.com/apache/arrow-rs/blob/733b7e7fd1e8c43a404c3ce40ecf741d493c21b4/parquet/src/arrow/buffer/bit_util.rs#L55
+// Convert the byte slice to fixed length byte array with the length of 16
+fn sign_extend_be(b: &[u8]) -> [u8; 16] {
+    assert!(b.len() <= 16, "Array too large, expected less than 16");
+    let is_negative = (b[0] & 128u8) == 128u8;
+    let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] };
+    for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) {
+        *d = *s;
+    }
+    result
+}
+
+/// Extract a single min/max statistics from a [`ParquetStatistics`] object
+///
+/// * `$column_statistics` is the `ParquetStatistics` object
+/// * `$func is the function` (`min`/`max`) to call to get the value
+/// * `$bytes_func` is the function (`min_bytes`/`max_bytes`) to call to get 
the value as bytes
+/// * `$target_arrow_type` is the [`DataType`] of the target statistics
+macro_rules! get_statistic {
+    ($column_statistics:expr, $func:ident, $bytes_func:ident, 
$target_arrow_type:expr) => {{
+        if !$column_statistics.has_min_max_set() {
+            return None;
+        }
+        match $column_statistics {
+            ParquetStatistics::Boolean(s) => 
Some(ScalarValue::Boolean(Some(*s.$func()))),
+            ParquetStatistics::Int32(s) => {
+                match $target_arrow_type {
+                    // int32 to decimal with the precision and scale
+                    Some(DataType::Decimal128(precision, scale)) => {
+                        Some(ScalarValue::Decimal128(
+                            Some(*s.$func() as i128),
+                            *precision,
+                            *scale,
+                        ))
+                    }
+                    _ => Some(ScalarValue::Int32(Some(*s.$func()))),
+                }
+            }
+            ParquetStatistics::Int64(s) => {
+                match $target_arrow_type {
+                    // int64 to decimal with the precision and scale
+                    Some(DataType::Decimal128(precision, scale)) => {
+                        Some(ScalarValue::Decimal128(
+                            Some(*s.$func() as i128),
+                            *precision,
+                            *scale,
+                        ))
+                    }
+                    _ => Some(ScalarValue::Int64(Some(*s.$func()))),
+                }
+            }
+            // 96 bit ints not supported
+            ParquetStatistics::Int96(_) => None,
+            ParquetStatistics::Float(s) => 
Some(ScalarValue::Float32(Some(*s.$func()))),
+            ParquetStatistics::Double(s) => 
Some(ScalarValue::Float64(Some(*s.$func()))),
+            ParquetStatistics::ByteArray(s) => {
+                match $target_arrow_type {
+                    // decimal data type
+                    Some(DataType::Decimal128(precision, scale)) => {
+                        Some(ScalarValue::Decimal128(
+                            Some(from_bytes_to_i128(s.$bytes_func())),
+                            *precision,
+                            *scale,
+                        ))
+                    }
+                    _ => {
+                        let s = std::str::from_utf8(s.$bytes_func())
+                            .map(|s| s.to_string())
+                            .ok();
+                        Some(ScalarValue::Utf8(s))
+                    }
+                }
+            }
+            // type not supported yet
+            ParquetStatistics::FixedLenByteArray(s) => {
+                match $target_arrow_type {
+                    // just support the decimal data type
+                    Some(DataType::Decimal128(precision, scale)) => {
+                        Some(ScalarValue::Decimal128(
+                            Some(from_bytes_to_i128(s.$bytes_func())),
+                            *precision,
+                            *scale,
+                        ))
+                    }
+                    _ => None,
+                }
+            }
+        }
+    }};
+}
+
+/// Lookups up the parquet column by name
+///
+/// Returns the parquet column index and the corresponding arrow field
+pub(crate) fn parquet_column<'a>(
+    parquet_schema: &SchemaDescriptor,
+    arrow_schema: &'a Schema,
+    name: &str,
+) -> Option<(usize, &'a FieldRef)> {
+    let (root_idx, field) = arrow_schema.fields.find(name)?;
+    if field.data_type().is_nested() {
+        // Nested fields are not supported and require non-trivial logic
+        // to correctly walk the parquet schema accounting for the
+        // logical type rules - 
<https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
+        //
+        // For example a ListArray could correspond to anything from 1 to 3 
levels
+        // in the parquet schema
+        return None;
+    }
+
+    // This could be made more efficient (#TBD)
+    let parquet_idx = (0..parquet_schema.columns().len())
+        .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
+    Some((parquet_idx, field))
+}
+
+/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to 
an [`ArrayRef`]
+pub(crate) fn min_statistics<'a, I: Iterator<Item = Option<&'a 
ParquetStatistics>>>(
+    data_type: &DataType,
+    iterator: I,
+) -> Result<ArrayRef> {
+    let scalars = iterator
+        .map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes, 
Some(data_type))));
+    collect_scalars(data_type, scalars)
+}
+
+/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to 
an [`ArrayRef`]
+pub(crate) fn max_statistics<'a, I: Iterator<Item = Option<&'a 
ParquetStatistics>>>(
+    data_type: &DataType,
+    iterator: I,
+) -> Result<ArrayRef> {
+    let scalars = iterator
+        .map(|x| x.and_then(|s| get_statistic!(s, max, max_bytes, 
Some(data_type))));
+    collect_scalars(data_type, scalars)
+}
+
+/// Builds an array from an iterator of ScalarValue
+fn collect_scalars<I: Iterator<Item = Option<ScalarValue>>>(
+    data_type: &DataType,
+    iterator: I,
+) -> Result<ArrayRef> {
+    let mut scalars = iterator.peekable();
+    match scalars.peek().is_none() {
+        true => Ok(new_empty_array(data_type)),
+        false => {
+            let null = ScalarValue::try_from(data_type)?;
+            ScalarValue::iter_to_array(scalars.map(|x| x.unwrap_or_else(|| 
null.clone())))
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use arrow_array::{
+        new_null_array, Array, BinaryArray, BooleanArray, Decimal128Array, 
Float32Array,
+        Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, 
StructArray,
+        TimestampNanosecondArray,
+    };
+    use arrow_schema::{Field, SchemaRef};
+    use bytes::Bytes;
+    use datafusion_common::test_util::parquet_test_data;
+    use parquet::arrow::arrow_reader::ArrowReaderBuilder;
+    use parquet::arrow::arrow_writer::ArrowWriter;
+    use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+    use parquet::file::properties::{EnabledStatistics, WriterProperties};
+    use std::path::PathBuf;
+    use std::sync::Arc;
+
+    // TODO error cases (with parquet statistics that are mismatched in 
expected type)
+
+    #[test]
+    fn roundtrip_empty() {
+        let empty_bool_array = new_empty_array(&DataType::Boolean);
+        Test {
+            input: empty_bool_array.clone(),
+            expected_min: empty_bool_array.clone(),
+            expected_max: empty_bool_array.clone(),
+        }
+        .run()
+    }
+
+    #[test]
+    fn roundtrip_bool() {
+        Test {
+            input: bool_array([
+                // row group 1
+                Some(true),
+                None,
+                Some(true),
+                // row group 2
+                Some(true),
+                Some(false),
+                None,
+                // row group 3
+                None,
+                None,
+                None,
+            ]),
+            expected_min: bool_array([Some(true), Some(false), None]),
+            expected_max: bool_array([Some(true), Some(true), None]),
+        }
+        .run()
+    }
+
+    #[test]
+    fn roundtrip_int32() {
+        Test {
+            input: i32_array([
+                // row group 1
+                Some(1),
+                None,
+                Some(3),
+                // row group 2
+                Some(0),
+                Some(5),
+                None,
+                // row group 3
+                None,
+                None,
+                None,
+            ]),
+            expected_min: i32_array([Some(1), Some(0), None]),
+            expected_max: i32_array([Some(3), Some(5), None]),
+        }
+        .run()
+    }
+
+    #[test]
+    fn roundtrip_int64() {
+        Test {
+            input: i64_array([
+                // row group 1
+                Some(1),
+                None,
+                Some(3),
+                // row group 2
+                Some(0),
+                Some(5),
+                None,
+                // row group 3
+                None,
+                None,
+                None,
+            ]),
+            expected_min: i64_array([Some(1), Some(0), None]),
+            expected_max: i64_array(vec![Some(3), Some(5), None]),
+        }
+        .run()
+    }
+
+    #[test]
+    fn roundtrip_f32() {
+        Test {
+            input: f32_array([
+                // row group 1
+                Some(1.0),
+                None,
+                Some(3.0),
+                // row group 2
+                Some(-1.0),
+                Some(5.0),
+                None,
+                // row group 3
+                None,
+                None,
+                None,
+            ]),
+            expected_min: f32_array([Some(1.0), Some(-1.0), None]),
+            expected_max: f32_array([Some(3.0), Some(5.0), None]),
+        }
+        .run()
+    }
+
+    #[test]
+    fn roundtrip_f64() {
+        Test {
+            input: f64_array([
+                // row group 1
+                Some(1.0),
+                None,
+                Some(3.0),
+                // row group 2
+                Some(-1.0),
+                Some(5.0),
+                None,
+                // row group 3
+                None,
+                None,
+                None,
+            ]),
+            expected_min: f64_array([Some(1.0), Some(-1.0), None]),
+            expected_max: f64_array([Some(3.0), Some(5.0), None]),
+        }
+        .run()
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "Inconsistent types in ScalarValue::iter_to_array. Expected 
Int64, got TimestampNanosecond(NULL, None)"
+    )]
+    // Due to https://github.com/apache/arrow-datafusion/issues/8295
+    fn roundtrip_timestamp() {
+        Test {
+            input: timestamp_array([
+                // row group 1
+                Some(1),
+                None,
+                Some(3),
+                // row group 2
+                Some(9),
+                Some(5),
+                None,
+                // row group 3
+                None,
+                None,
+                None,
+            ]),
+            expected_min: timestamp_array([Some(1), Some(5), None]),
+            expected_max: timestamp_array([Some(3), Some(9), None]),
+        }
+        .run()
+    }
+
+    #[test]
+    fn roundtrip_decimal() {
+        Test {
+            input: Arc::new(
+                Decimal128Array::from(vec![
+                    // row group 1
+                    Some(100),
+                    None,
+                    Some(22000),
+                    // row group 2
+                    Some(500000),
+                    Some(330000),
+                    None,
+                    // row group 3
+                    None,
+                    None,
+                    None,
+                ])
+                .with_precision_and_scale(9, 2)
+                .unwrap(),
+            ),
+            expected_min: Arc::new(
+                Decimal128Array::from(vec![Some(100), Some(330000), None])
+                    .with_precision_and_scale(9, 2)
+                    .unwrap(),
+            ),
+            expected_max: Arc::new(
+                Decimal128Array::from(vec![Some(22000), Some(500000), None])
+                    .with_precision_and_scale(9, 2)
+                    .unwrap(),
+            ),
+        }
+        .run()
+    }
+
+    #[test]
+    fn roundtrip_utf8() {
+        Test {
+            input: utf8_array([
+                // row group 1
+                Some("A"),
+                None,
+                Some("Q"),
+                // row group 2
+                Some("ZZ"),
+                Some("AA"),
+                None,
+                // row group 3
+                None,
+                None,
+                None,
+            ]),
+            expected_min: utf8_array([Some("A"), Some("AA"), None]),
+            expected_max: utf8_array([Some("Q"), Some("ZZ"), None]),
+        }
+        .run()
+    }
+
+    #[test]
+    fn roundtrip_struct() {
+        let mut test = Test {
+            input: struct_array(vec![
+                // row group 1
+                (Some(true), Some(1)),
+                (None, None),
+                (Some(true), Some(3)),
+                // row group 2
+                (Some(true), Some(0)),
+                (Some(false), Some(5)),
+                (None, None),
+                // row group 3
+                (None, None),
+                (None, None),
+                (None, None),
+            ]),
+            expected_min: struct_array(vec![
+                (Some(true), Some(1)),
+                (Some(true), Some(0)),
+                (None, None),
+            ]),
+
+            expected_max: struct_array(vec![
+                (Some(true), Some(3)),
+                (Some(true), Some(0)),
+                (None, None),
+            ]),
+        };
+        // Due to https://github.com/apache/arrow-datafusion/issues/8334,
+        // statistics for struct arrays are not supported
+        test.expected_min =
+            new_null_array(test.input.data_type(), test.expected_min.len());
+        test.expected_max =
+            new_null_array(test.input.data_type(), test.expected_min.len());
+        test.run()
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "Inconsistent types in ScalarValue::iter_to_array. Expected 
Utf8, got Binary(NULL)"
+    )]
+    // Due to https://github.com/apache/arrow-datafusion/issues/8295
+    fn roundtrip_binary() {
+        Test {
+            input: Arc::new(BinaryArray::from_opt_vec(vec![
+                // row group 1
+                Some(b"A"),
+                None,
+                Some(b"Q"),
+                // row group 2
+                Some(b"ZZ"),
+                Some(b"AA"),
+                None,
+                // row group 3
+                None,
+                None,
+                None,
+            ])),
+            expected_min: Arc::new(BinaryArray::from_opt_vec(vec![
+                Some(b"A"),
+                Some(b"AA"),
+                None,
+            ])),
+            expected_max: Arc::new(BinaryArray::from_opt_vec(vec![
+                Some(b"Q"),
+                Some(b"ZZ"),
+                None,
+            ])),
+        }
+        .run()
+    }
+
+    #[test]
+    fn struct_and_non_struct() {
+        // Ensures that statistics for an array that appears *after* a struct
+        // array are not wrong
+        let struct_col = struct_array(vec![
+            // row group 1
+            (Some(true), Some(1)),
+            (None, None),
+            (Some(true), Some(3)),
+        ]);
+        let int_col = i32_array([Some(100), Some(200), Some(300)]);
+        let expected_min = i32_array([Some(100)]);
+        let expected_max = i32_array(vec![Some(300)]);
+
+        // use a name that shadows a name in the struct column
+        match struct_col.data_type() {
+            DataType::Struct(fields) => {
+                assert_eq!(fields.get(1).unwrap().name(), "int_col")
+            }
+            _ => panic!("unexpected data type for struct column"),
+        };
+
+        let input_batch = RecordBatch::try_from_iter([
+            ("struct_col", struct_col),
+            ("int_col", int_col),
+        ])
+        .unwrap();
+
+        let schema = input_batch.schema();
+
+        let metadata = parquet_metadata(schema.clone(), input_batch);
+        let parquet_schema = metadata.file_metadata().schema_descr();
+
+        // read the int_col statistics
+        let (idx, _) = parquet_column(parquet_schema, &schema, 
"int_col").unwrap();
+        assert_eq!(idx, 2);
+
+        let row_groups = metadata.row_groups();
+        let iter = row_groups.iter().map(|x| x.column(idx).statistics());
+
+        let min = min_statistics(&DataType::Int32, iter.clone()).unwrap();
+        assert_eq!(
+            &min,
+            &expected_min,
+            "Min. Statistics\n\n{}\n\n",
+            DisplayStats(row_groups)
+        );
+
+        let max = max_statistics(&DataType::Int32, iter).unwrap();
+        assert_eq!(
+            &max,
+            &expected_max,
+            "Max. Statistics\n\n{}\n\n",
+            DisplayStats(row_groups)
+        );
+    }
+
+    #[test]
+    fn nan_in_stats() {
+        // /parquet-testing/data/nan_in_stats.parquet
+        // row_groups: 1
+        // "x": Double({min: Some(1.0), max: Some(NaN), distinct_count: None, 
null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false})
+
+        TestFile::new("nan_in_stats.parquet")
+            .with_column(ExpectedColumn {
+                name: "x",
+                expected_min: Arc::new(Float64Array::from(vec![Some(1.0)])),
+                expected_max: 
Arc::new(Float64Array::from(vec![Some(f64::NAN)])),
+            })
+            .run();
+    }
+
+    #[test]
+    fn alltypes_plain() {
+        // /parquet-testing/data/datapage_v1-snappy-compressed-checksum.parquet
+        // row_groups: 1
+        // (has no statistics)
+        TestFile::new("alltypes_plain.parquet")
+            // No column statistics should be read as NULL, but with the right 
type
+            .with_column(ExpectedColumn {
+                name: "id",
+                expected_min: i32_array([None]),
+                expected_max: i32_array([None]),
+            })
+            .with_column(ExpectedColumn {
+                name: "bool_col",
+                expected_min: bool_array([None]),
+                expected_max: bool_array([None]),
+            })
+            .run();
+    }
+
+    #[test]
+    fn alltypes_tiny_pages() {
+        // /parquet-testing/data/alltypes_tiny_pages.parquet
+        // row_groups: 1
+        // "id": Int32({min: Some(0), max: Some(7299), distinct_count: None, 
null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false})
+        // "bool_col": Boolean({min: Some(false), max: Some(true), 
distinct_count: None, null_count: 0, min_max_deprecated: false, 
min_max_backwards_compatible: false})
+        // "tinyint_col": Int32({min: Some(0), max: Some(9), distinct_count: 
None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: 
false})
+        // "smallint_col": Int32({min: Some(0), max: Some(9), distinct_count: 
None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: 
false})
+        // "int_col": Int32({min: Some(0), max: Some(9), distinct_count: None, 
null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false})
+        // "bigint_col": Int64({min: Some(0), max: Some(90), distinct_count: 
None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: 
false})
+        // "float_col": Float({min: Some(0.0), max: Some(9.9), distinct_count: 
None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: 
false})
+        // "double_col": Double({min: Some(0.0), max: Some(90.89999999999999), 
distinct_count: None, null_count: 0, min_max_deprecated: false, 
min_max_backwards_compatible: false})
+        // "date_string_col": ByteArray({min: Some(ByteArray { data: 
"01/01/09" }), max: Some(ByteArray { data: "12/31/10" }), distinct_count: None, 
null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false})
+        // "string_col": ByteArray({min: Some(ByteArray { data: "0" }), max: 
Some(ByteArray { data: "9" }), distinct_count: None, null_count: 0, 
min_max_deprecated: false, min_max_backwards_compatible: false})
+        // "timestamp_col": Int96({min: None, max: None, distinct_count: None, 
null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true})
+        // "year": Int32({min: Some(2009), max: Some(2010), distinct_count: 
None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: 
false})
+        // "month": Int32({min: Some(1), max: Some(12), distinct_count: None, 
null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false})
+        TestFile::new("alltypes_tiny_pages.parquet")
+            .with_column(ExpectedColumn {
+                name: "id",
+                expected_min: i32_array([Some(0)]),
+                expected_max: i32_array([Some(7299)]),
+            })
+            .with_column(ExpectedColumn {
+                name: "bool_col",
+                expected_min: bool_array([Some(false)]),
+                expected_max: bool_array([Some(true)]),
+            })
+            .with_column(ExpectedColumn {
+                name: "tinyint_col",
+                expected_min: i32_array([Some(0)]),
+                expected_max: i32_array([Some(9)]),
+            })
+            .with_column(ExpectedColumn {
+                name: "smallint_col",
+                expected_min: i32_array([Some(0)]),
+                expected_max: i32_array([Some(9)]),
+            })
+            .with_column(ExpectedColumn {
+                name: "int_col",
+                expected_min: i32_array([Some(0)]),
+                expected_max: i32_array([Some(9)]),
+            })
+            .with_column(ExpectedColumn {
+                name: "bigint_col",
+                expected_min: i64_array([Some(0)]),
+                expected_max: i64_array([Some(90)]),
+            })
+            .with_column(ExpectedColumn {
+                name: "float_col",
+                expected_min: f32_array([Some(0.0)]),
+                expected_max: f32_array([Some(9.9)]),
+            })
+            .with_column(ExpectedColumn {
+                name: "double_col",
+                expected_min: f64_array([Some(0.0)]),
+                expected_max: f64_array([Some(90.89999999999999)]),
+            })
+            .with_column(ExpectedColumn {
+                name: "date_string_col",
+                expected_min: utf8_array([Some("01/01/09")]),
+                expected_max: utf8_array([Some("12/31/10")]),
+            })
+            .with_column(ExpectedColumn {
+                name: "string_col",
+                expected_min: utf8_array([Some("0")]),
+                expected_max: utf8_array([Some("9")]),
+            })
+            // File has no min/max for timestamp_col
+            .with_column(ExpectedColumn {
+                name: "timestamp_col",
+                expected_min: timestamp_array([None]),
+                expected_max: timestamp_array([None]),
+            })
+            .with_column(ExpectedColumn {
+                name: "year",
+                expected_min: i32_array([Some(2009)]),
+                expected_max: i32_array([Some(2010)]),
+            })
+            .with_column(ExpectedColumn {
+                name: "month",
+                expected_min: i32_array([Some(1)]),
+                expected_max: i32_array([Some(12)]),
+            })
+            .run();
+    }
+
+    #[test]
+    fn fixed_length_decimal_legacy() {
+        // /parquet-testing/data/fixed_length_decimal_legacy.parquet
+        // row_groups: 1
+        // "value": FixedLenByteArray({min: Some(FixedLenByteArray(ByteArray { 
data: Some(ByteBufferPtr { data: b"\0\0\0\0\0\xc8" }) })), max: 
Some(FixedLenByteArray(ByteArray { data: "\0\0\0\0\t`" })), distinct_count: 
None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: 
true})
+
+        TestFile::new("fixed_length_decimal_legacy.parquet")
+            .with_column(ExpectedColumn {
+                name: "value",
+                expected_min: Arc::new(
+                    Decimal128Array::from(vec![Some(200)])
+                        .with_precision_and_scale(13, 2)
+                        .unwrap(),
+                ),
+                expected_max: Arc::new(
+                    Decimal128Array::from(vec![Some(2400)])
+                        .with_precision_and_scale(13, 2)
+                        .unwrap(),
+                ),
+            })
+            .run();
+    }
+
+    const ROWS_PER_ROW_GROUP: usize = 3;
+
+    /// Writes the input batch into a parquet file, with every every three 
rows as
+    /// their own row group, and compares the min/maxes to the expected values
+    struct Test {
+        input: ArrayRef,
+        expected_min: ArrayRef,
+        expected_max: ArrayRef,
+    }
+
+    impl Test {
+        fn run(self) {
+            let Self {
+                input,
+                expected_min,
+                expected_max,
+            } = self;
+
+            let input_batch = RecordBatch::try_from_iter([("c1", 
input)]).unwrap();
+
+            let schema = input_batch.schema();
+
+            let metadata = parquet_metadata(schema.clone(), input_batch);
+            let parquet_schema = metadata.file_metadata().schema_descr();
+
+            let row_groups = metadata.row_groups();
+
+            for field in schema.fields() {
+                if field.data_type().is_nested() {
+                    let lookup = parquet_column(parquet_schema, &schema, 
field.name());
+                    assert_eq!(lookup, None);
+                    continue;
+                }
+
+                let (idx, f) =
+                    parquet_column(parquet_schema, &schema, 
field.name()).unwrap();
+                assert_eq!(f, field);
+
+                let iter = row_groups.iter().map(|x| 
x.column(idx).statistics());
+                let min = min_statistics(f.data_type(), iter.clone()).unwrap();
+                assert_eq!(
+                    &min,
+                    &expected_min,
+                    "Min. Statistics\n\n{}\n\n",
+                    DisplayStats(row_groups)
+                );
+
+                let max = max_statistics(f.data_type(), iter).unwrap();
+                assert_eq!(
+                    &max,
+                    &expected_max,
+                    "Max. Statistics\n\n{}\n\n",
+                    DisplayStats(row_groups)
+                );
+            }
+        }
+    }
+
+    /// Write the specified batches out as parquet and return the metadata
+    fn parquet_metadata(schema: SchemaRef, batch: RecordBatch) -> 
Arc<ParquetMetaData> {
+        let props = WriterProperties::builder()
+            .set_statistics_enabled(EnabledStatistics::Chunk)
+            .set_max_row_group_size(ROWS_PER_ROW_GROUP)
+            .build();
+
+        let mut buffer = Vec::new();
+        let mut writer = ArrowWriter::try_new(&mut buffer, schema, 
Some(props)).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        let reader = ArrowReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
+        reader.metadata().clone()
+    }
+
+    /// Formats the statistics nicely for display
+    struct DisplayStats<'a>(&'a [RowGroupMetaData]);
+    impl<'a> std::fmt::Display for DisplayStats<'a> {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            let row_groups = self.0;
+            writeln!(f, "  row_groups: {}", row_groups.len())?;
+            for rg in row_groups {
+                for col in rg.columns() {
+                    if let Some(statistics) = col.statistics() {
+                        writeln!(f, "   {}: {:?}", col.column_path(), 
statistics)?;
+                    }
+                }
+            }
+            Ok(())
+        }
+    }
+
+    struct ExpectedColumn {
+        name: &'static str,
+        expected_min: ArrayRef,
+        expected_max: ArrayRef,
+    }
+
+    /// Reads statistics out of the specified, and compares them to the 
expected values
+    struct TestFile {
+        file_name: &'static str,
+        expected_columns: Vec<ExpectedColumn>,
+    }
+
+    impl TestFile {
+        fn new(file_name: &'static str) -> Self {
+            Self {
+                file_name,
+                expected_columns: Vec::new(),
+            }
+        }
+
+        fn with_column(mut self, column: ExpectedColumn) -> Self {
+            self.expected_columns.push(column);
+            self
+        }
+
+        /// Reads the specified parquet file and validates that the exepcted 
min/max
+        /// values for the specified columns are as expected.
+        fn run(self) {
+            let path = PathBuf::from(parquet_test_data()).join(self.file_name);
+            let file = std::fs::File::open(path).unwrap();
+            let reader = ArrowReaderBuilder::try_new(file).unwrap();
+            let arrow_schema = reader.schema();
+            let metadata = reader.metadata();
+            let row_groups = metadata.row_groups();
+            let parquet_schema = metadata.file_metadata().schema_descr();
+
+            for expected_column in self.expected_columns {
+                let ExpectedColumn {
+                    name,
+                    expected_min,
+                    expected_max,
+                } = expected_column;
+
+                let (idx, field) =
+                    parquet_column(parquet_schema, arrow_schema, 
name).unwrap();
+
+                let iter = row_groups.iter().map(|x| 
x.column(idx).statistics());
+                let actual_min = min_statistics(field.data_type(), 
iter.clone()).unwrap();
+                assert_eq!(&expected_min, &actual_min, "column {name}");
+
+                let actual_max = max_statistics(field.data_type(), 
iter).unwrap();
+                assert_eq!(&expected_max, &actual_max, "column {name}");
+            }
+        }
+    }
+
+    fn bool_array(input: impl IntoIterator<Item = Option<bool>>) -> ArrayRef {
+        let array: BooleanArray = input.into_iter().collect();
+        Arc::new(array)
+    }
+
+    fn i32_array(input: impl IntoIterator<Item = Option<i32>>) -> ArrayRef {
+        let array: Int32Array = input.into_iter().collect();
+        Arc::new(array)
+    }
+
+    fn i64_array(input: impl IntoIterator<Item = Option<i64>>) -> ArrayRef {
+        let array: Int64Array = input.into_iter().collect();
+        Arc::new(array)
+    }
+
+    fn f32_array(input: impl IntoIterator<Item = Option<f32>>) -> ArrayRef {
+        let array: Float32Array = input.into_iter().collect();
+        Arc::new(array)
+    }
+
+    fn f64_array(input: impl IntoIterator<Item = Option<f64>>) -> ArrayRef {
+        let array: Float64Array = input.into_iter().collect();
+        Arc::new(array)
+    }
+
+    fn timestamp_array(input: impl IntoIterator<Item = Option<i64>>) -> 
ArrayRef {
+        let array: TimestampNanosecondArray = input.into_iter().collect();
+        Arc::new(array)
+    }
+
+    fn utf8_array<'a>(input: impl IntoIterator<Item = Option<&'a str>>) -> 
ArrayRef {
+        let array: StringArray = input
+            .into_iter()
+            .map(|s| s.map(|s| s.to_string()))
+            .collect();
+        Arc::new(array)
+    }
+
+    // returns a struct array with columns "bool_col" and "int_col" with the 
specified values
+    fn struct_array(input: Vec<(Option<bool>, Option<i32>)>) -> ArrayRef {
+        let boolean: BooleanArray = input.iter().map(|(b, _i)| b).collect();
+        let int: Int32Array = input.iter().map(|(_b, i)| i).collect();
+
+        let nullable = true;
+        let struct_array = StructArray::from(vec![
+            (
+                Arc::new(Field::new("bool_col", DataType::Boolean, nullable)),
+                Arc::new(boolean) as ArrayRef,
+            ),
+            (
+                Arc::new(Field::new("int_col", DataType::Int32, nullable)),
+                Arc::new(int) as ArrayRef,
+            ),
+        ]);
+        Arc::new(struct_array)
+    }
+}

Reply via email to