This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new eba6d1d2b5 introduce StatisticsConverter::from_column_index (#9540)
eba6d1d2b5 is described below

commit eba6d1d2b519284b272ca73b6b319fa0d98f2d7c
Author: Matthew Kim <[email protected]>
AuthorDate: Mon Jun 22 20:19:50 2026 +0200

    introduce StatisticsConverter::from_column_index (#9540)
    
    # Which issue does this PR close?
    
    - Closes https://github.com/apache/arrow-rs/issues/9539
    
    # Rationale for this change
    
    This PR adds a constructor StatisticsConverter::from_column_index that
    bypasses schema resolution and accepts a pre-resolved leaf column index
    directly
    
    Datafusion's pruning predicate system needs stats for struct fields to
    support predicate pushdown on expressions like `WHERE my_struct.field >
    67`. Datafusion already resolves struct fields to Parquet leaf indices
    elsewhere, but has no way to pass that index to StatisticsConverter
---
 parquet/src/arrow/arrow_reader/statistics.rs |  52 +++++++-
 parquet/tests/arrow_reader/statistics.rs     | 179 ++++++++++++++++++++++++++-
 2 files changed, 229 insertions(+), 2 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/statistics.rs 
b/parquet/src/arrow/arrow_reader/statistics.rs
index 19d3e34f52..99bf28c8c7 100644
--- a/parquet/src/arrow/arrow_reader/statistics.rs
+++ b/parquet/src/arrow/arrow_reader/statistics.rs
@@ -1353,7 +1353,10 @@ where
 /// Note: The Parquet schema and Arrow schema do not have to be identical (for
 /// example, the columns may be in different orders and one or the other 
schemas
 /// may have additional columns). The function [`parquet_column`] is used to
-/// match the column in the Parquet schema to the column in the Arrow schema.
+/// match the column in the Parquet schema to the column in the Arrow schema
+/// when using [`Self::try_new`]. For nested fields (e.g., struct fields),
+/// where `parquet_column` does not support schema resolution, use
+/// [`Self::from_column_index`] instead with a pre-resolved leaf column index.
 #[derive(Debug)]
 pub struct StatisticsConverter<'a> {
     /// the index of the matched column in the Parquet schema
@@ -1454,6 +1457,9 @@ impl<'a> StatisticsConverter<'a> {
     /// arrays will be null. This can happen if the column is in the arrow
     /// schema but not in the parquet schema due to schema evolution.
     ///
+    /// This constructor only supports top-level, non-nested columns. For 
nested
+    /// fields (e.g., fields within a struct), use [`Self::from_column_index`].
+    ///
     /// See example on [`Self::row_group_mins`] for usage
     ///
     /// # Errors
@@ -1495,6 +1501,50 @@ impl<'a> StatisticsConverter<'a> {
         })
     }
 
+    /// Create a new `StatisticsConverter` from a Parquet leaf column index 
directly.
+    ///
+    /// Unlike [`Self::try_new`], this constructor bypasses schema resolution 
and
+    /// accepts a Parquet column index directly. This is useful for nested 
fields
+    /// (e.g., struct fields) where the caller has already resolved the mapping
+    /// from the Arrow field to the Parquet leaf column.
+    ///
+    /// # Arguments
+    ///
+    /// * `parquet_column_index` - The index of the leaf column in the Parquet 
schema
+    /// * `arrow_field` - The Arrow field describing the column's data type
+    /// * `parquet_schema` - The Parquet schema descriptor (used to look up 
the physical type)
+    ///
+    /// The caller must ensure that `arrow_field` describes the same leaf 
column as
+    /// `parquet_column_index`. This mapping is not validated by the 
converter; if
+    /// the Arrow type does not match the Parquet column statistics, extraction
+    /// returns null statistics values rather than an error.
+    ///
+    /// # Errors
+    ///
+    /// * If the `parquet_column_index` is out of bounds
+    pub fn from_column_index(
+        parquet_column_index: usize,
+        arrow_field: &'a Field,
+        parquet_schema: &'a SchemaDescriptor,
+    ) -> Result<Self> {
+        if parquet_column_index >= parquet_schema.columns().len() {
+            return Err(arrow_err!(format!(
+                "Parquet column index {} out of bounds, column count {}",
+                parquet_column_index,
+                parquet_schema.columns().len()
+            )));
+        }
+
+        let physical_type = 
parquet_schema.column(parquet_column_index).physical_type();
+
+        Ok(Self {
+            parquet_column_index: Some(parquet_column_index),
+            arrow_field,
+            missing_null_counts_as_zero: true,
+            physical_type: Some(physical_type),
+        })
+    }
+
     /// Extract the minimum values from row group statistics in 
[`RowGroupMetaData`]
     ///
     /// # Return Value
diff --git a/parquet/tests/arrow_reader/statistics.rs 
b/parquet/tests/arrow_reader/statistics.rs
index 4f7ddcff4a..bbb891dfd2 100644
--- a/parquet/tests/arrow_reader/statistics.rs
+++ b/parquet/tests/arrow_reader/statistics.rs
@@ -2628,7 +2628,7 @@ mod test {
     use arrow::util::test_util::parquet_test_data;
     use arrow_array::{
         ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, 
Int8Array, Int16Array,
-        Int32Array, Int64Array, RecordBatch, StringArray, 
TimestampNanosecondArray,
+        Int32Array, Int64Array, RecordBatch, StringArray, StructArray, 
TimestampNanosecondArray,
         new_empty_array,
     };
     use arrow_schema::{DataType, SchemaRef, TimeUnit};
@@ -2899,6 +2899,183 @@ mod test {
         }
     }
 
+    #[test]
+    fn struct_leaf_statistics_from_column_index() {
+        let leaf_field = Arc::new(Field::new("leaf", DataType::Int32, true));
+        let leaf_array: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(3),
+            Some(1),
+            None,
+            Some(9),
+            Some(4),
+            Some(6),
+        ]));
+
+        let amount_array = decimal128_array(
+            [
+                Some(i128::from(100)),
+                Some(i128::from(-500)),
+                None,
+                Some(i128::from(2000)),
+                Some(i128::from(600)),
+                Some(i128::from(50)),
+            ],
+            20,
+            2,
+        );
+        let amount_field = Arc::new(Field::new("amount", 
amount_array.data_type().clone(), true));
+
+        let struct_array = StructArray::from(vec![
+            (leaf_field.clone(), leaf_array),
+            (amount_field.clone(), amount_array),
+        ]);
+        let struct_array: ArrayRef = Arc::new(struct_array);
+        let input_batch = RecordBatch::try_from_iter([("c1", 
struct_array)]).unwrap();
+
+        let reader = build_parquet_file(
+            ROWS_PER_ROW_GROUP * 2,
+            Some(EnabledStatistics::Page),
+            Some(ROWS_PER_ROW_GROUP),
+            vec![input_batch],
+        );
+
+        let schema = reader.schema();
+        let metadata = reader.metadata();
+        let parquet_schema = reader.parquet_schema();
+        let row_groups = metadata.row_groups();
+        let row_group_indices = [0];
+        let column_page_index = metadata
+            .column_index()
+            .expect("file should have column page indices");
+        let column_offset_index = metadata
+            .offset_index()
+            .expect("file should have column offset indices");
+
+        let DataType::Struct(fields) = 
schema.field_with_name("c1").unwrap().data_type() else {
+            unreachable!("c1 must be a struct field")
+        };
+        let leaf_arrow_field = fields[0].as_ref();
+        let amount_arrow_field = fields[1].as_ref();
+
+        assert_eq!(parquet_column(parquet_schema, schema, "c1"), None);
+
+        let leaf_converter =
+            StatisticsConverter::from_column_index(0, leaf_arrow_field, 
parquet_schema).unwrap();
+
+        assert_eq!(leaf_converter.parquet_column_index(), Some(0));
+        assert_eq!(leaf_converter.arrow_field(), leaf_arrow_field);
+
+        let leaf_mins = 
leaf_converter.row_group_mins(row_groups.iter()).unwrap();
+        assert_eq!(&leaf_mins, &i32_array([Some(1)]));
+
+        let leaf_maxes = 
leaf_converter.row_group_maxes(row_groups.iter()).unwrap();
+        assert_eq!(&leaf_maxes, &i32_array([Some(9)]));
+
+        let leaf_null_counts = leaf_converter
+            .row_group_null_counts(row_groups.iter())
+            .unwrap();
+        assert_eq!(leaf_null_counts, UInt64Array::from(vec![1]));
+
+        let leaf_row_counts = leaf_converter
+            .row_group_row_counts(row_groups.iter())
+            .unwrap();
+        assert_eq!(leaf_row_counts, Some(UInt64Array::from(vec![6])));
+
+        let leaf_page_mins = leaf_converter
+            .data_page_mins(
+                column_page_index,
+                column_offset_index,
+                row_group_indices.iter(),
+            )
+            .unwrap();
+        assert_eq!(&leaf_page_mins, &i32_array([Some(1), Some(4)]));
+
+        let leaf_page_maxes = leaf_converter
+            .data_page_maxes(
+                column_page_index,
+                column_offset_index,
+                row_group_indices.iter(),
+            )
+            .unwrap();
+        assert_eq!(&leaf_page_maxes, &i32_array([Some(3), Some(9)]));
+
+        let leaf_page_null_counts = leaf_converter
+            .data_page_null_counts(
+                column_page_index,
+                column_offset_index,
+                row_group_indices.iter(),
+            )
+            .unwrap();
+        assert_eq!(leaf_page_null_counts, UInt64Array::from(vec![1, 0]));
+
+        let leaf_page_row_counts = leaf_converter
+            .data_page_row_counts(column_offset_index, row_groups, 
row_group_indices.iter())
+            .unwrap();
+        assert_eq!(leaf_page_row_counts, Some(UInt64Array::from(vec![3, 3])));
+
+        let amount_converter =
+            StatisticsConverter::from_column_index(1, amount_arrow_field, 
parquet_schema).unwrap();
+
+        let amount_mins = 
amount_converter.row_group_mins(row_groups.iter()).unwrap();
+        assert_eq!(
+            &amount_mins,
+            &decimal128_array([Some(i128::from(-500))], 20, 2)
+        );
+
+        let amount_maxes = 
amount_converter.row_group_maxes(row_groups.iter()).unwrap();
+        assert_eq!(
+            &amount_maxes,
+            &decimal128_array([Some(i128::from(2000))], 20, 2)
+        );
+
+        let amount_page_mins = amount_converter
+            .data_page_mins(
+                column_page_index,
+                column_offset_index,
+                row_group_indices.iter(),
+            )
+            .unwrap();
+        assert_eq!(
+            &amount_page_mins,
+            &decimal128_array([Some(i128::from(-500)), Some(i128::from(50))], 
20, 2)
+        );
+
+        let amount_page_maxes = amount_converter
+            .data_page_maxes(
+                column_page_index,
+                column_offset_index,
+                row_group_indices.iter(),
+            )
+            .unwrap();
+        assert_eq!(
+            &amount_page_maxes,
+            &decimal128_array([Some(i128::from(100)), Some(i128::from(2000))], 
20, 2)
+        );
+
+        let column_count = parquet_schema.columns().len();
+        let err =
+            StatisticsConverter::from_column_index(column_count, 
leaf_arrow_field, parquet_schema)
+                .unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            format!(
+                "Arrow: Parquet column index {column_count} out of bounds, 
column count {column_count}"
+            )
+        );
+    }
+
+    fn decimal128_array(
+        input: impl IntoIterator<Item = Option<i128>>,
+        precision: u8,
+        scale: i8,
+    ) -> ArrayRef {
+        Arc::new(
+            Decimal128Array::from_iter(input)
+                .with_precision_and_scale(precision, scale)
+                .unwrap(),
+        )
+    }
+
     /// Write the specified batches out as parquet and return the metadata
     fn parquet_metadata(schema: SchemaRef, batch: RecordBatch) -> 
Arc<ParquetMetaData> {
         let props = WriterProperties::builder()

Reply via email to