This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 27c9cdaecc correct parquet leaf index mapping when schema contains 
struct cols (#20698)
27c9cdaecc is described below

commit 27c9cdaeccf0ac25821803689454d0df0997792d
Author: Matthew Kim <[email protected]>
AuthorDate: Thu Mar 5 01:48:18 2026 -0500

    correct parquet leaf index mapping when schema contains struct cols (#20698)
    
    ## Which issue does this PR close?
    
    - Closes https://github.com/apache/datafusion/issues/20695
    
    ## Rationale for this change
    
    Row filter pushdown assumed Arrow field indices equal Parquet leaf
    indices, which breaks when Struct columns are present because their
    children expand into separate leaves and shift all subsequent indices
---
 datafusion/datasource-parquet/src/row_filter.rs    | 126 +++++++++++++++------
 .../test_files/parquet_filter_pushdown.slt         |  49 +++++++-
 2 files changed, 140 insertions(+), 35 deletions(-)

diff --git a/datafusion/datasource-parquet/src/row_filter.rs 
b/datafusion/datasource-parquet/src/row_filter.rs
index 2924208c5b..62ba53bb87 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -242,10 +242,10 @@ impl FilterCandidateBuilder {
 
         let root_indices: Vec<_> =
             required_columns.required_columns.into_iter().collect();
+
         let leaf_indices = leaf_indices_for_roots(
             &root_indices,
             metadata.file_metadata().schema_descr(),
-            required_columns.nested,
         );
 
         let projected_schema = 
Arc::new(self.file_schema.project(&root_indices)?);
@@ -277,8 +277,6 @@ struct PushdownChecker<'schema> {
     projected_columns: bool,
     /// Indices into the file schema of columns required to evaluate the 
expression.
     required_columns: Vec<usize>,
-    /// Tracks the nested column behavior found during traversal.
-    nested_behavior: NestedColumnSupport,
     /// Whether nested list columns are supported by the predicate semantics.
     allow_list_columns: bool,
     /// The Arrow schema of the parquet file.
@@ -291,7 +289,6 @@ impl<'schema> PushdownChecker<'schema> {
             non_primitive_columns: false,
             projected_columns: false,
             required_columns: Vec::new(),
-            nested_behavior: NestedColumnSupport::PrimitiveOnly,
             allow_list_columns,
             file_schema,
         }
@@ -324,16 +321,11 @@ impl<'schema> PushdownChecker<'schema> {
     /// `None` if the type is supported and pushdown can continue.
     fn handle_nested_type(&mut self, data_type: &DataType) -> 
Option<TreeNodeRecursion> {
         if self.is_nested_type_supported(data_type) {
-            // Update to ListsSupported if we haven't encountered unsupported 
types yet
-            if self.nested_behavior == NestedColumnSupport::PrimitiveOnly {
-                self.nested_behavior = NestedColumnSupport::ListsSupported;
-            }
             None
         } else {
             // Block pushdown for unsupported nested types:
             // - Structs (regardless of predicate support)
             // - Lists without supported predicates
-            self.nested_behavior = NestedColumnSupport::Unsupported;
             self.non_primitive_columns = true;
             Some(TreeNodeRecursion::Jump)
         }
@@ -368,7 +360,6 @@ impl<'schema> PushdownChecker<'schema> {
         self.required_columns.dedup();
         PushdownColumns {
             required_columns: self.required_columns,
-            nested: self.nested_behavior,
         }
     }
 }
@@ -391,21 +382,6 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
 ///
 /// This enum makes explicit the different states a predicate can be in
 /// with respect to nested column handling during Parquet decoding.
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-enum NestedColumnSupport {
-    /// Expression references only primitive (non-nested) columns.
-    /// These can always be pushed down to the Parquet decoder.
-    PrimitiveOnly,
-    /// Expression references list columns with supported predicates
-    /// (e.g., array_has, array_has_all, IS NULL).
-    /// These can be pushed down to the Parquet decoder.
-    ListsSupported,
-    /// Expression references unsupported nested types (e.g., structs)
-    /// or list columns without supported predicates.
-    /// These cannot be pushed down and must be evaluated after decoding.
-    Unsupported,
-}
-
 /// Result of checking which columns are required for filter pushdown.
 #[derive(Debug)]
 struct PushdownColumns {
@@ -413,7 +389,6 @@ struct PushdownColumns {
     /// the filter expression. Must be in ascending order for correct schema
     /// projection matching.
     required_columns: Vec<usize>,
-    nested: NestedColumnSupport,
 }
 
 /// Checks if a given expression can be pushed down to the parquet decoder.
@@ -437,15 +412,13 @@ fn pushdown_columns(
 fn leaf_indices_for_roots(
     root_indices: &[usize],
     schema_descr: &SchemaDescriptor,
-    nested: NestedColumnSupport,
 ) -> Vec<usize> {
-    // For primitive-only columns, root indices ARE the leaf indices
-    if nested == NestedColumnSupport::PrimitiveOnly {
-        return root_indices.to_vec();
-    }
-
-    // For List columns, expand to the single leaf column (item field)
-    // For Struct columns (unsupported), this would expand to multiple leaves
+    // Always map root (Arrow) indices to Parquet leaf indices via the schema
+    // descriptor. Arrow root indices only equal Parquet leaf indices when the
+    // schema has no group columns (Struct, Map, etc.); when group columns
+    // exist, their children become separate leaves and shift all subsequent
+    // leaf indices.
+    // Struct columns are unsupported.
     let root_set: BTreeSet<_> = root_indices.iter().copied().collect();
 
     (0..schema_descr.num_columns())
@@ -1088,6 +1061,91 @@ mod test {
             .expect("parsing schema")
     }
 
+    /// Regression test: when a schema has Struct columns, Arrow field indices 
diverge
+    /// from Parquet leaf indices (Struct children become separate leaves). The
+    /// `PrimitiveOnly` fast-path in `leaf_indices_for_roots` assumes they are 
equal,
+    /// so a filter on a primitive column *after* a Struct gets the wrong leaf 
index.
+    ///
+    /// Schema:
+    ///   Arrow indices:   col_a=0  struct_col=1  col_b=2
+    ///   Parquet leaves:  col_a=0  struct_col.x=1  struct_col.y=2  col_b=3
+    ///
+    /// A filter on col_b should project Parquet leaf 3, but the bug causes it 
to
+    /// project leaf 2 (struct_col.y).
+    #[test]
+    fn test_filter_pushdown_leaf_index_with_struct_in_schema() {
+        use arrow::array::{Int32Array, StringArray, StructArray};
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("col_a", DataType::Int32, false),
+            Field::new(
+                "struct_col",
+                DataType::Struct(
+                    vec![
+                        Arc::new(Field::new("x", DataType::Int32, true)),
+                        Arc::new(Field::new("y", DataType::Int32, true)),
+                    ]
+                    .into(),
+                ),
+                true,
+            ),
+            Field::new("col_b", DataType::Utf8, false),
+        ]));
+
+        let col_a = Arc::new(Int32Array::from(vec![1, 2, 3]));
+        let struct_col = Arc::new(StructArray::from(vec![
+            (
+                Arc::new(Field::new("x", DataType::Int32, true)),
+                Arc::new(Int32Array::from(vec![10, 20, 30])) as _,
+            ),
+            (
+                Arc::new(Field::new("y", DataType::Int32, true)),
+                Arc::new(Int32Array::from(vec![100, 200, 300])) as _,
+            ),
+        ]));
+        let col_b = Arc::new(StringArray::from(vec!["aaa", "target", "zzz"]));
+
+        let batch =
+            RecordBatch::try_new(Arc::clone(&schema), vec![col_a, struct_col, 
col_b])
+                .unwrap();
+
+        let file = NamedTempFile::new().expect("temp file");
+        let mut writer =
+            ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), 
None)
+                .expect("writer");
+        writer.write(&batch).expect("write batch");
+        writer.close().expect("close writer");
+
+        let reader_file = file.reopen().expect("reopen file");
+        let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
+            .expect("reader builder");
+        let metadata = builder.metadata().clone();
+        let file_schema = builder.schema().clone();
+
+        // sanity check: 4 Parquet leaves, 3 Arrow fields
+        assert_eq!(metadata.file_metadata().schema_descr().num_columns(), 4);
+        assert_eq!(file_schema.fields().len(), 3);
+
+        // build a filter candidate for `col_b = 'target'` through the public 
API
+        let expr = col("col_b").eq(Expr::Literal(
+            ScalarValue::Utf8(Some("target".to_string())),
+            None,
+        ));
+        let expr = logical2physical(&expr, &file_schema);
+
+        let candidate = FilterCandidateBuilder::new(expr, file_schema)
+            .build(&metadata)
+            .expect("building candidate")
+            .expect("filter on primitive col_b should be pushable");
+
+        // col_b is Parquet leaf 3 (shifted by struct_col's two children).
+        assert_eq!(
+            candidate.projection.leaf_indices,
+            vec![3],
+            "leaf_indices should be [3] for col_b"
+        );
+    }
+
     /// Sanity check that the given expression could be evaluated against the 
given schema without any errors.
     /// This will fail if the expression references columns that are not in 
the schema or if the types of the columns are incompatible, etc.
     fn check_expression_can_evaluate_against_schema(
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt 
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index ef82bd1391..6c4383f997 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -812,7 +812,6 @@ WHERE h2o_parquet_20696.time >= 
'1970-01-01T00:00:00.000000050Z'
 72.4 53.4 51
 70.4 50.4 50
 
-
 statement ok
 set datafusion.execution.parquet.pushdown_filters = true;
 
@@ -842,3 +841,51 @@ DROP TABLE o2_parquet_20696;
 # Cleanup settings
 statement ok
 set datafusion.execution.parquet.pushdown_filters = false;
+
+##########
+# Regression test: filter pushdown with Struct columns in schema
+#
+# When a schema has Struct columns, Arrow field indices diverge from Parquet
+# leaf indices (Struct children become separate leaves). A filter on a
+# primitive column *after* a Struct must use the correct Parquet leaf index.
+#
+# Schema:
+#   Arrow:   col_a=0  struct_col=1              col_b=2
+#   Parquet: col_a=0  struct_col.x=1  struct_col.y=2  col_b=3
+##########
+
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+statement ok
+COPY (
+  SELECT
+    column1 as col_a,
+    column2 as struct_col,
+    column3 as col_b
+  FROM VALUES
+    (1, {x: 10, y: 100}, 'aaa'),
+    (2, {x: 20, y: 200}, 'target'),
+    (3, {x: 30, y: 300}, 'zzz')
+) TO 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet'
+STORED AS PARQUET;
+
+statement ok
+CREATE EXTERNAL TABLE t_struct_filter
+STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet';
+
+# Filter on col_b (the primitive column after the struct).
+# Before the fix, this returned 0 rows because the filter read struct_col.y
+# (Parquet leaf 2) instead of col_b (Parquet leaf 3).
+query IT
+SELECT col_a, col_b FROM t_struct_filter WHERE col_b = 'target';
+----
+2 target
+
+# Clean up
+statement ok
+set datafusion.execution.parquet.pushdown_filters = false;
+
+statement ok
+DROP TABLE t_struct_filter;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to