This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 580b0abdb4 Use leaf level `ProjectionMask` for parquet projections 
(#20925)
580b0abdb4 is described below

commit 580b0abdb487e1e226f5236e605ec5c75ec729b9
Author: Matthew Kim <[email protected]>
AuthorDate: Fri Mar 27 11:30:41 2026 -0400

    Use leaf level `ProjectionMask` for parquet projections (#20925)
    
    - Added on from https://github.com/apache/datafusion/pull/20913
    
    Please review from the third commit
    
    ## Rationale for this change
    
    This PR reuses the `ParquetReadPlan` (introduced for the row filter
    pushdown) to also resolve projection expressions to parquet leaf column
    indices
    
    Previously, projecting a single field from a struct with many children
    would read all leaves of that struct. This aligns the projection path
    with the row filter path, which already had leaf-level struct pruning
---
 datafusion/datasource-parquet/src/opener.rs        |  15 +-
 datafusion/datasource-parquet/src/row_filter.rs    | 251 ++++++++++++++++++++-
 .../test_files/projection_pushdown.slt             |  76 ++++++-
 3 files changed, 330 insertions(+), 12 deletions(-)

diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index 2522ae3050..1dbb801c93 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -18,6 +18,7 @@
 //! [`ParquetOpener`] for opening Parquet files
 
 use crate::page_filter::PagePruningAccessPlanFilter;
+use crate::row_filter::build_projection_read_plan;
 use crate::row_group_filter::RowGroupAccessPlanFilter;
 use crate::{
     ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
@@ -59,13 +60,13 @@ use 
datafusion_execution::parquet_encryption::EncryptionFactory;
 use futures::{Stream, StreamExt, ready};
 use log::debug;
 use parquet::DecodeResult;
+use parquet::arrow::ParquetRecordBatchStreamBuilder;
 use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
 use parquet::arrow::arrow_reader::{
     ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
 };
 use parquet::arrow::async_reader::AsyncFileReader;
 use parquet::arrow::push_decoder::{ParquetPushDecoder, 
ParquetPushDecoderBuilder};
-use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
 use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
 
 /// Implements [`FileOpener`] for a parquet file
@@ -583,12 +584,14 @@ impl FileOpener for ParquetOpener {
             // metrics from the arrow reader itself
             let arrow_reader_metrics = ArrowReaderMetrics::enabled();
 
-            let indices = projection.column_indices();
-            let mask =
-                ProjectionMask::roots(reader_metadata.parquet_schema(), 
indices.clone());
+            let read_plan = build_projection_read_plan(
+                projection.expr_iter(),
+                &physical_file_schema,
+                reader_metadata.parquet_schema(),
+            );
 
             let decoder = builder
-                .with_projection(mask)
+                .with_projection(read_plan.projection_mask)
                 .with_metrics(arrow_reader_metrics.clone())
                 .build()?;
 
@@ -601,7 +604,7 @@ impl FileOpener for ParquetOpener {
             // Rebase column indices to match the narrowed stream schema.
             // The projection expressions have indices based on 
physical_file_schema,
             // but the stream only contains the columns selected by the 
ProjectionMask.
-            let stream_schema = 
Arc::new(physical_file_schema.project(&indices)?);
+            let stream_schema = read_plan.projected_schema;
             let replace_schema = stream_schema != output_schema;
             let projection = projection
                 .try_map_exprs(|expr| reassign_expr_columns(expr, 
&stream_schema))?;
diff --git a/datafusion/datasource-parquet/src/row_filter.rs 
b/datafusion/datasource-parquet/src/row_filter.rs
index d120f743fa..67b65321d9 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -83,7 +83,7 @@ use datafusion_common::cast::as_boolean_array;
 use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, 
TreeNodeVisitor};
 use datafusion_physical_expr::ScalarFunctionExpr;
 use datafusion_physical_expr::expressions::{Column, Literal};
-use datafusion_physical_expr::utils::reassign_expr_columns;
+use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
 use datafusion_physical_expr::{PhysicalExpr, split_conjunction};
 
 use datafusion_physical_plan::metrics;
@@ -424,10 +424,26 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
                 .first()
                 .and_then(|a| a.as_any().downcast_ref::<Column>())
             {
+                // for Map columns, get_field performs a runtime key lookup 
rather than a
+                // schema-level field access so the entire Map column must be 
read,
+                // we skip the struct field optimization and defer to normal 
Column traversal
+                let is_map_column = self
+                    .file_schema
+                    .index_of(column.name())
+                    .ok()
+                    .map(|idx| {
+                        matches!(
+                            self.file_schema.field(idx).data_type(),
+                            DataType::Map(_, _)
+                        )
+                    })
+                    .unwrap_or(false);
+
                 let return_type = func.return_type();
 
-                if !DataType::is_nested(return_type)
-                    || self.is_nested_type_supported(return_type)
+                if !is_map_column
+                    && (!DataType::is_nested(return_type)
+                        || self.is_nested_type_supported(return_type))
                 {
                     // try to resolve all field name arguments to strinrg 
literals
                     // if any argument is not a string literal, we can not 
determine the exact
@@ -579,6 +595,136 @@ pub(crate) fn build_parquet_read_plan(
     )))
 }
 
+/// Builds a unified [`ParquetReadPlan`] for a set of projection expressions
+///
+/// Unlike [`build_parquet_read_plan`] (which is used for filter pushdown and
+/// returns `None` when an expression references unsupported nested types or
+/// missing columns), this function always succeeds. It collects every column
+/// that *can* be resolved in the file and produces a leaf-level projection
+/// mask. Columns missing from the file are silently skipped since the 
projection
+/// layer handles those by inserting nulls.
+pub(crate) fn build_projection_read_plan(
+    exprs: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>,
+    file_schema: &Schema,
+    schema_descr: &SchemaDescriptor,
+) -> ParquetReadPlan {
+    // fast path: if every expression is a plain Column reference, skip all
+    // struct analysis and use root-level projection directly
+    let exprs = exprs.into_iter().collect::<Vec<_>>();
+    let all_plain_columns = exprs
+        .iter()
+        .all(|e| e.as_any().downcast_ref::<Column>().is_some());
+
+    if all_plain_columns {
+        let mut root_indices: Vec<usize> = exprs
+            .iter()
+            .map(|e| e.as_any().downcast_ref::<Column>().unwrap().index())
+            .collect();
+        root_indices.sort_unstable();
+        root_indices.dedup();
+
+        let projection_mask =
+            ProjectionMask::roots(schema_descr, root_indices.iter().copied());
+        let projected_schema = Arc::new(
+            file_schema
+                .project(&root_indices)
+                .expect("valid column indices"),
+        );
+
+        return ParquetReadPlan {
+            projection_mask,
+            projected_schema,
+        };
+    }
+
+    // secondary fast path: if the schema has no struct columns, we can skip
+    // PushdownChecker traversal and use root-level projection
+    let has_struct_columns = file_schema
+        .fields()
+        .iter()
+        .any(|f| matches!(f.data_type(), DataType::Struct(_)));
+
+    if !has_struct_columns {
+        let mut root_indices = exprs
+            .into_iter()
+            .flat_map(|e| collect_columns(&e).into_iter().map(|col| 
col.index()))
+            .collect::<Vec<_>>();
+
+        root_indices.sort_unstable();
+        root_indices.dedup();
+
+        let projection_mask =
+            ProjectionMask::roots(schema_descr, root_indices.iter().copied());
+
+        let projected_schema = Arc::new(
+            file_schema
+                .project(&root_indices)
+                .expect("valid column indices"),
+        );
+
+        return ParquetReadPlan {
+            projection_mask,
+            projected_schema,
+        };
+    }
+
+    let mut all_root_indices = Vec::new();
+    let mut all_struct_accesses = Vec::new();
+
+    for expr in exprs {
+        let mut checker = PushdownChecker::new(file_schema, true);
+        let _ = expr.visit(&mut checker);
+        let columns = checker.into_sorted_columns();
+
+        all_root_indices.extend_from_slice(&columns.required_columns);
+        all_struct_accesses.extend(columns.struct_field_accesses);
+    }
+
+    all_root_indices.sort_unstable();
+    all_root_indices.dedup();
+
+    // when no struct field accesses were found, fall back to root-level 
projection
+    // to match the performance of the simple path
+    if all_struct_accesses.is_empty() {
+        let projection_mask =
+            ProjectionMask::roots(schema_descr, 
all_root_indices.iter().copied());
+        let projected_schema = Arc::new(
+            file_schema
+                .project(&all_root_indices)
+                .expect("valid column indices"),
+        );
+
+        return ParquetReadPlan {
+            projection_mask,
+            projected_schema,
+        };
+    }
+
+    let leaf_indices = {
+        let mut out =
+            leaf_indices_for_roots(all_root_indices.iter().copied(), 
schema_descr);
+        let struct_leaf_indices =
+            resolve_struct_field_leaves(&all_struct_accesses, file_schema, 
schema_descr);
+
+        out.extend_from_slice(&struct_leaf_indices);
+        out.sort_unstable();
+        out.dedup();
+
+        out
+    };
+
+    let projection_mask =
+        ProjectionMask::leaves(schema_descr, leaf_indices.iter().copied());
+
+    let projected_schema =
+        build_filter_schema(file_schema, &all_root_indices, 
&all_struct_accesses);
+
+    ParquetReadPlan {
+        projection_mask,
+        projected_schema,
+    }
+}
+
 fn leaf_indices_for_roots<I>(
     root_indices: I,
     schema_descr: &SchemaDescriptor,
@@ -654,6 +800,8 @@ fn build_filter_schema(
     regular_indices: &[usize],
     struct_field_accesses: &[StructFieldAccess],
 ) -> SchemaRef {
+    let regular_set: BTreeSet<usize> = 
regular_indices.iter().copied().collect();
+
     let all_indices = regular_indices
         .iter()
         .copied()
@@ -669,6 +817,15 @@ fn build_filter_schema(
         .map(|&idx| {
             let field = file_schema.field(idx);
 
+            // if this column appears as a regular (whole-column) reference,
+            // keep the full type
+            //
+            // Pruning is only valid when the column is accessed exclusively
+            // through struct field accesses
+            if regular_set.contains(&idx) {
+                return Arc::new(field.clone());
+            }
+
             // collect all field paths that access this root struct column
             let field_paths = struct_field_accesses
                 .iter()
@@ -683,7 +840,6 @@ fn build_filter_schema(
                 .collect::<Vec<_>>();
 
             if field_paths.is_empty() {
-                // its a regular column - use the full type
                 return Arc::new(field.clone());
             }
 
@@ -696,7 +852,10 @@ fn build_filter_schema(
         })
         .collect::<Vec<_>>();
 
-    Arc::new(Schema::new(fields))
+    Arc::new(Schema::new_with_metadata(
+        fields,
+        file_schema.metadata().clone(),
+    ))
 }
 
 fn prune_struct_type(dt: &DataType, paths: &[&[String]]) -> DataType {
@@ -958,6 +1117,8 @@ mod test {
     use parquet::file::reader::{FileReader, SerializedFileReader};
     use tempfile::NamedTempFile;
 
+    use datafusion_physical_expr::expressions::Column as PhysicalColumn;
+
     // List predicates used by the decoder should be accepted for pushdown
     #[test]
     fn test_filter_candidate_builder_supports_list_types() {
@@ -1814,6 +1975,86 @@ mod test {
         assert_eq!(file_metrics.pushdown_rows_matched.value(), 2);
     }
 
+    #[test]
+    fn projection_read_plan_preserves_full_struct() {
+        // Schema: id (Int32), s (Struct{value: Int32, label: Utf8})
+        // Parquet leaves: id=0, s.value=1, s.label=2
+        let struct_fields: Fields = vec![
+            Arc::new(Field::new("value", DataType::Int32, false)),
+            Arc::new(Field::new("label", DataType::Utf8, false)),
+        ]
+        .into();
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("s", DataType::Struct(struct_fields.clone()), false),
+        ]));
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3])),
+                Arc::new(StructArray::new(
+                    struct_fields,
+                    vec![
+                        Arc::new(Int32Array::from(vec![10, 20, 30])) as _,
+                        Arc::new(StringArray::from(vec!["a", "b", "c"])) as _,
+                    ],
+                    None,
+                )),
+            ],
+        )
+        .unwrap();
+
+        let file = NamedTempFile::new().expect("temp file");
+        let mut writer =
+            ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), 
None)
+                .expect("writer");
+        writer.write(&batch).expect("write batch");
+        writer.close().expect("close writer");
+
+        let reader_file = file.reopen().expect("reopen file");
+        let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
+            .expect("reader builder");
+        let metadata = builder.metadata().clone();
+        let file_schema = builder.schema().clone();
+        let schema_descr = metadata.file_metadata().schema_descr();
+
+        // Simulate SELECT * output projection: Column("id") and Column("s")
+        // Plus a get_field(s, 'value') expression from the pushed-down filter
+        let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
+            Arc::new(PhysicalColumn::new("id", 0)),
+            Arc::new(PhysicalColumn::new("s", 1)),
+            logical2physical(
+                &get_field().call(vec![
+                    col("s"),
+                    
Expr::Literal(ScalarValue::Utf8(Some("value".to_string())), None),
+                ]),
+                &file_schema,
+            ),
+        ];
+
+        let read_plan = build_projection_read_plan(exprs, &file_schema, 
schema_descr);
+
+        // The projected schema must have the FULL struct type because 
Column("s")
+        // is in the projection. It should NOT be narrowed to Struct{value: 
Int32}.
+        let s_field = read_plan.projected_schema.field_with_name("s").unwrap();
+        assert_eq!(
+            s_field.data_type(),
+            &DataType::Struct(
+                vec![
+                    Arc::new(Field::new("value", DataType::Int32, false)),
+                    Arc::new(Field::new("label", DataType::Utf8, false)),
+                ]
+                .into()
+            ),
+        );
+
+        // all3 Parquet leaves should be in the projection mask
+        let expected_mask = ProjectionMask::leaves(schema_descr, [0, 1, 2]);
+        assert_eq!(read_plan.projection_mask, expected_mask,);
+    }
+
     /// Sanity check that the given expression could be evaluated against the 
given schema without any errors.
     /// This will fail if the expression references columns that are not in 
the schema or if the types of the columns are incompatible, etc.
     fn check_expression_can_evaluate_against_schema(
diff --git a/datafusion/sqllogictest/test_files/projection_pushdown.slt 
b/datafusion/sqllogictest/test_files/projection_pushdown.slt
index 1735b1fb41..777f1e00ed 100644
--- a/datafusion/sqllogictest/test_files/projection_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/projection_pushdown.slt
@@ -1994,9 +1994,83 @@ WHERE COALESCE(get_field(s, 'f1'), get_field(s, 'f2')) = 
1;
 ----
 1
 
+#####################
+# Section 8: SELECT * with struct field filter
+#####################
+
+# When SELECT * includes the full struct but the filter only accesses a
+# sub-field (e.g. s['id']), the leaf-level projection must not narrow the
+# struct schema in the output. Previously build_projection_read_plan would
+# produce a schema with Struct("id": Int32) while the data still contained
+# Struct("id": Int32, "value": Utf8), causing an ArrowError.
+
+# 8.1: SELECT * with equality filter on struct sub-field
+query I?
+SELECT * FROM simple_struct WHERE s['value'] = 100;
+----
+1 {value: 100, label: alpha}
+
+# 8.2: Explicit SELECT of whole struct with struct sub-field filter
+query ?
+SELECT s FROM simple_struct WHERE s['value'] = 100;
+----
+{value: 100, label: alpha}
+
+# 8.3: Whole struct + sub-field projection + sub-field filter
+query I?I
+SELECT s['value'], s, id FROM simple_struct WHERE s['value'] = 100;
+----
+100 {value: 100, label: alpha} 1
+
+# 8.4: Whole struct in output, filter on a different sub-field than projected
+query ?T
+SELECT s, s['label'] FROM simple_struct WHERE s['value'] > 200;
+----
+{value: 300, label: delta} delta
+{value: 250, label: epsilon} epsilon
+
+# 8.5: Filter references both sub-fields, output includes whole struct
+query I?
+SELECT id, s FROM simple_struct WHERE s['value'] > 100 AND s['label'] = 'beta';
+----
+2 {value: 200, label: beta}
+
+# 8.6: Only sub-field projection with sub-field filter (no whole struct — 
should prune)
+query II
+SELECT id, s['value'] FROM simple_struct WHERE s['value'] = 100;
+----
+1 100
+
+# 8.7: Nested struct — whole struct output with deeply nested field filter
+query I?
+SELECT * FROM nested_struct WHERE nested['outer']['inner'] > 15;
+----
+2 {outer: {inner: 20, name: two}, extra: y}
+3 {outer: {inner: 30, name: three}, extra: z}
+
+# 8.8: Nested struct — explicit whole struct select with sibling field filter
+query ?
+SELECT nested FROM nested_struct WHERE nested['extra'] = 'y';
+----
+{outer: {inner: 20, name: two}, extra: y}
+
+# 8.9: Nullable struct — whole struct output with sub-field filter
+query ?
+SELECT s FROM nullable_struct WHERE s['value'] > 100;
+----
+{value: 150, label: gamma}
+{value: 250, label: epsilon}
+
+# 8.10: Struct sub-field filter combined with top-level column filter
+query ?I
+SELECT s, id FROM simple_struct WHERE s['value'] > 100 AND id < 4;
+----
+{value: 200, label: beta} 2
+{value: 150, label: gamma} 3
+
 # Config reset
 
-# The SLT runner sets `target_partitions` to 4 instead of using the default, 
so 
+# The SLT runner sets `target_partitions` to 4 instead of using the default, so
 # reset it explicitly.
 statement ok
 SET datafusion.execution.target_partitions = 4;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to