This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new eba6d1d2b5 introduce StatisticsConverter::from_column_index (#9540)
eba6d1d2b5 is described below
commit eba6d1d2b519284b272ca73b6b319fa0d98f2d7c
Author: Matthew Kim <[email protected]>
AuthorDate: Mon Jun 22 20:19:50 2026 +0200
introduce StatisticsConverter::from_column_index (#9540)
# Which issue does this PR close?
- Closes https://github.com/apache/arrow-rs/issues/9539
# Rationale for this change
This PR adds a constructor StatisticsConverter::from_column_index that
bypasses schema resolution and accepts a pre-resolved leaf column index
directly
Datafusion's pruning predicate system needs stats for struct fields to
support predicate pushdown on expressions like `WHERE my_struct.field >
67`. Datafusion already resolves struct fields to Parquet leaf indices
elsewhere, but has no way to pass that index to StatisticsConverter
---
parquet/src/arrow/arrow_reader/statistics.rs | 52 +++++++-
parquet/tests/arrow_reader/statistics.rs | 179 ++++++++++++++++++++++++++-
2 files changed, 229 insertions(+), 2 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/statistics.rs
b/parquet/src/arrow/arrow_reader/statistics.rs
index 19d3e34f52..99bf28c8c7 100644
--- a/parquet/src/arrow/arrow_reader/statistics.rs
+++ b/parquet/src/arrow/arrow_reader/statistics.rs
@@ -1353,7 +1353,10 @@ where
/// Note: The Parquet schema and Arrow schema do not have to be identical (for
/// example, the columns may be in different orders and one or the other
schemas
/// may have additional columns). The function [`parquet_column`] is used to
-/// match the column in the Parquet schema to the column in the Arrow schema.
+/// match the column in the Parquet schema to the column in the Arrow schema
+/// when using [`Self::try_new`]. For nested fields (e.g., struct fields),
+/// where `parquet_column` does not support schema resolution, use
+/// [`Self::from_column_index`] instead with a pre-resolved leaf column index.
#[derive(Debug)]
pub struct StatisticsConverter<'a> {
/// the index of the matched column in the Parquet schema
@@ -1454,6 +1457,9 @@ impl<'a> StatisticsConverter<'a> {
/// arrays will be null. This can happen if the column is in the arrow
/// schema but not in the parquet schema due to schema evolution.
///
+ /// This constructor only supports top-level, non-nested columns. For
nested
+ /// fields (e.g., fields within a struct), use [`Self::from_column_index`].
+ ///
/// See example on [`Self::row_group_mins`] for usage
///
/// # Errors
@@ -1495,6 +1501,50 @@ impl<'a> StatisticsConverter<'a> {
})
}
+ /// Create a new `StatisticsConverter` from a Parquet leaf column index
directly.
+ ///
+ /// Unlike [`Self::try_new`], this constructor bypasses schema resolution
and
+ /// accepts a Parquet column index directly. This is useful for nested
fields
+ /// (e.g., struct fields) where the caller has already resolved the mapping
+ /// from the Arrow field to the Parquet leaf column.
+ ///
+ /// # Arguments
+ ///
+ /// * `parquet_column_index` - The index of the leaf column in the Parquet
schema
+ /// * `arrow_field` - The Arrow field describing the column's data type
+ /// * `parquet_schema` - The Parquet schema descriptor (used to look up
the physical type)
+ ///
+ /// The caller must ensure that `arrow_field` describes the same leaf
column as
+ /// `parquet_column_index`. This mapping is not validated by the
converter; if
+ /// the Arrow type does not match the Parquet column statistics, extraction
+ /// returns null statistics values rather than an error.
+ ///
+ /// # Errors
+ ///
+ /// * If the `parquet_column_index` is out of bounds
+ pub fn from_column_index(
+ parquet_column_index: usize,
+ arrow_field: &'a Field,
+ parquet_schema: &'a SchemaDescriptor,
+ ) -> Result<Self> {
+ if parquet_column_index >= parquet_schema.columns().len() {
+ return Err(arrow_err!(format!(
+ "Parquet column index {} out of bounds, column count {}",
+ parquet_column_index,
+ parquet_schema.columns().len()
+ )));
+ }
+
+ let physical_type =
parquet_schema.column(parquet_column_index).physical_type();
+
+ Ok(Self {
+ parquet_column_index: Some(parquet_column_index),
+ arrow_field,
+ missing_null_counts_as_zero: true,
+ physical_type: Some(physical_type),
+ })
+ }
+
/// Extract the minimum values from row group statistics in
[`RowGroupMetaData`]
///
/// # Return Value
diff --git a/parquet/tests/arrow_reader/statistics.rs
b/parquet/tests/arrow_reader/statistics.rs
index 4f7ddcff4a..bbb891dfd2 100644
--- a/parquet/tests/arrow_reader/statistics.rs
+++ b/parquet/tests/arrow_reader/statistics.rs
@@ -2628,7 +2628,7 @@ mod test {
use arrow::util::test_util::parquet_test_data;
use arrow_array::{
ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array,
Int8Array, Int16Array,
- Int32Array, Int64Array, RecordBatch, StringArray,
TimestampNanosecondArray,
+ Int32Array, Int64Array, RecordBatch, StringArray, StructArray,
TimestampNanosecondArray,
new_empty_array,
};
use arrow_schema::{DataType, SchemaRef, TimeUnit};
@@ -2899,6 +2899,183 @@ mod test {
}
}
+ #[test]
+ fn struct_leaf_statistics_from_column_index() {
+ let leaf_field = Arc::new(Field::new("leaf", DataType::Int32, true));
+ let leaf_array: ArrayRef = Arc::new(Int32Array::from(vec![
+ Some(3),
+ Some(1),
+ None,
+ Some(9),
+ Some(4),
+ Some(6),
+ ]));
+
+ let amount_array = decimal128_array(
+ [
+ Some(i128::from(100)),
+ Some(i128::from(-500)),
+ None,
+ Some(i128::from(2000)),
+ Some(i128::from(600)),
+ Some(i128::from(50)),
+ ],
+ 20,
+ 2,
+ );
+ let amount_field = Arc::new(Field::new("amount",
amount_array.data_type().clone(), true));
+
+ let struct_array = StructArray::from(vec![
+ (leaf_field.clone(), leaf_array),
+ (amount_field.clone(), amount_array),
+ ]);
+ let struct_array: ArrayRef = Arc::new(struct_array);
+ let input_batch = RecordBatch::try_from_iter([("c1",
struct_array)]).unwrap();
+
+ let reader = build_parquet_file(
+ ROWS_PER_ROW_GROUP * 2,
+ Some(EnabledStatistics::Page),
+ Some(ROWS_PER_ROW_GROUP),
+ vec![input_batch],
+ );
+
+ let schema = reader.schema();
+ let metadata = reader.metadata();
+ let parquet_schema = reader.parquet_schema();
+ let row_groups = metadata.row_groups();
+ let row_group_indices = [0];
+ let column_page_index = metadata
+ .column_index()
+ .expect("file should have column page indices");
+ let column_offset_index = metadata
+ .offset_index()
+ .expect("file should have column offset indices");
+
+ let DataType::Struct(fields) =
schema.field_with_name("c1").unwrap().data_type() else {
+ unreachable!("c1 must be a struct field")
+ };
+ let leaf_arrow_field = fields[0].as_ref();
+ let amount_arrow_field = fields[1].as_ref();
+
+ assert_eq!(parquet_column(parquet_schema, schema, "c1"), None);
+
+ let leaf_converter =
+ StatisticsConverter::from_column_index(0, leaf_arrow_field,
parquet_schema).unwrap();
+
+ assert_eq!(leaf_converter.parquet_column_index(), Some(0));
+ assert_eq!(leaf_converter.arrow_field(), leaf_arrow_field);
+
+ let leaf_mins =
leaf_converter.row_group_mins(row_groups.iter()).unwrap();
+ assert_eq!(&leaf_mins, &i32_array([Some(1)]));
+
+ let leaf_maxes =
leaf_converter.row_group_maxes(row_groups.iter()).unwrap();
+ assert_eq!(&leaf_maxes, &i32_array([Some(9)]));
+
+ let leaf_null_counts = leaf_converter
+ .row_group_null_counts(row_groups.iter())
+ .unwrap();
+ assert_eq!(leaf_null_counts, UInt64Array::from(vec![1]));
+
+ let leaf_row_counts = leaf_converter
+ .row_group_row_counts(row_groups.iter())
+ .unwrap();
+ assert_eq!(leaf_row_counts, Some(UInt64Array::from(vec![6])));
+
+ let leaf_page_mins = leaf_converter
+ .data_page_mins(
+ column_page_index,
+ column_offset_index,
+ row_group_indices.iter(),
+ )
+ .unwrap();
+ assert_eq!(&leaf_page_mins, &i32_array([Some(1), Some(4)]));
+
+ let leaf_page_maxes = leaf_converter
+ .data_page_maxes(
+ column_page_index,
+ column_offset_index,
+ row_group_indices.iter(),
+ )
+ .unwrap();
+ assert_eq!(&leaf_page_maxes, &i32_array([Some(3), Some(9)]));
+
+ let leaf_page_null_counts = leaf_converter
+ .data_page_null_counts(
+ column_page_index,
+ column_offset_index,
+ row_group_indices.iter(),
+ )
+ .unwrap();
+ assert_eq!(leaf_page_null_counts, UInt64Array::from(vec![1, 0]));
+
+ let leaf_page_row_counts = leaf_converter
+ .data_page_row_counts(column_offset_index, row_groups,
row_group_indices.iter())
+ .unwrap();
+ assert_eq!(leaf_page_row_counts, Some(UInt64Array::from(vec![3, 3])));
+
+ let amount_converter =
+ StatisticsConverter::from_column_index(1, amount_arrow_field,
parquet_schema).unwrap();
+
+ let amount_mins =
amount_converter.row_group_mins(row_groups.iter()).unwrap();
+ assert_eq!(
+ &amount_mins,
+ &decimal128_array([Some(i128::from(-500))], 20, 2)
+ );
+
+ let amount_maxes =
amount_converter.row_group_maxes(row_groups.iter()).unwrap();
+ assert_eq!(
+ &amount_maxes,
+ &decimal128_array([Some(i128::from(2000))], 20, 2)
+ );
+
+ let amount_page_mins = amount_converter
+ .data_page_mins(
+ column_page_index,
+ column_offset_index,
+ row_group_indices.iter(),
+ )
+ .unwrap();
+ assert_eq!(
+ &amount_page_mins,
+ &decimal128_array([Some(i128::from(-500)), Some(i128::from(50))],
20, 2)
+ );
+
+ let amount_page_maxes = amount_converter
+ .data_page_maxes(
+ column_page_index,
+ column_offset_index,
+ row_group_indices.iter(),
+ )
+ .unwrap();
+ assert_eq!(
+ &amount_page_maxes,
+ &decimal128_array([Some(i128::from(100)), Some(i128::from(2000))],
20, 2)
+ );
+
+ let column_count = parquet_schema.columns().len();
+ let err =
+ StatisticsConverter::from_column_index(column_count,
leaf_arrow_field, parquet_schema)
+ .unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ format!(
+ "Arrow: Parquet column index {column_count} out of bounds,
column count {column_count}"
+ )
+ );
+ }
+
+ fn decimal128_array(
+ input: impl IntoIterator<Item = Option<i128>>,
+ precision: u8,
+ scale: i8,
+ ) -> ArrayRef {
+ Arc::new(
+ Decimal128Array::from_iter(input)
+ .with_precision_and_scale(precision, scale)
+ .unwrap(),
+ )
+ }
+
/// Write the specified batches out as parquet and return the metadata
fn parquet_metadata(schema: SchemaRef, batch: RecordBatch) ->
Arc<ParquetMetaData> {
let props = WriterProperties::builder()