This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 580b0abdb4 Use leaf level `ProjectionMask` for parquet projections
(#20925)
580b0abdb4 is described below
commit 580b0abdb487e1e226f5236e605ec5c75ec729b9
Author: Matthew Kim <[email protected]>
AuthorDate: Fri Mar 27 11:30:41 2026 -0400
Use leaf level `ProjectionMask` for parquet projections (#20925)
- Added on from https://github.com/apache/datafusion/pull/20913
Please review from the third commit
## Rationale for this change
This PR reuses the `ParquetReadPlan` (introduced for the row filter
pushdown) to also resolve projection expressions to parquet leaf column
indices
Previously, projecting a single field from a struct with many children
would read all leaves of that struct. This aligns the projection path
with the row filter path, which already had leaf-level struct pruning
---
datafusion/datasource-parquet/src/opener.rs | 15 +-
datafusion/datasource-parquet/src/row_filter.rs | 251 ++++++++++++++++++++-
.../test_files/projection_pushdown.slt | 76 ++++++-
3 files changed, 330 insertions(+), 12 deletions(-)
diff --git a/datafusion/datasource-parquet/src/opener.rs
b/datafusion/datasource-parquet/src/opener.rs
index 2522ae3050..1dbb801c93 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -18,6 +18,7 @@
//! [`ParquetOpener`] for opening Parquet files
use crate::page_filter::PagePruningAccessPlanFilter;
+use crate::row_filter::build_projection_read_plan;
use crate::row_group_filter::RowGroupAccessPlanFilter;
use crate::{
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
@@ -59,13 +60,13 @@ use
datafusion_execution::parquet_encryption::EncryptionFactory;
use futures::{Stream, StreamExt, ready};
use log::debug;
use parquet::DecodeResult;
+use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::push_decoder::{ParquetPushDecoder,
ParquetPushDecoderBuilder};
-use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
/// Implements [`FileOpener`] for a parquet file
@@ -583,12 +584,14 @@ impl FileOpener for ParquetOpener {
// metrics from the arrow reader itself
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
- let indices = projection.column_indices();
- let mask =
- ProjectionMask::roots(reader_metadata.parquet_schema(),
indices.clone());
+ let read_plan = build_projection_read_plan(
+ projection.expr_iter(),
+ &physical_file_schema,
+ reader_metadata.parquet_schema(),
+ );
let decoder = builder
- .with_projection(mask)
+ .with_projection(read_plan.projection_mask)
.with_metrics(arrow_reader_metrics.clone())
.build()?;
@@ -601,7 +604,7 @@ impl FileOpener for ParquetOpener {
// Rebase column indices to match the narrowed stream schema.
// The projection expressions have indices based on
physical_file_schema,
// but the stream only contains the columns selected by the
ProjectionMask.
- let stream_schema =
Arc::new(physical_file_schema.project(&indices)?);
+ let stream_schema = read_plan.projected_schema;
let replace_schema = stream_schema != output_schema;
let projection = projection
.try_map_exprs(|expr| reassign_expr_columns(expr,
&stream_schema))?;
diff --git a/datafusion/datasource-parquet/src/row_filter.rs
b/datafusion/datasource-parquet/src/row_filter.rs
index d120f743fa..67b65321d9 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -83,7 +83,7 @@ use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion,
TreeNodeVisitor};
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::expressions::{Column, Literal};
-use datafusion_physical_expr::utils::reassign_expr_columns;
+use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
use datafusion_physical_expr::{PhysicalExpr, split_conjunction};
use datafusion_physical_plan::metrics;
@@ -424,10 +424,26 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
.first()
.and_then(|a| a.as_any().downcast_ref::<Column>())
{
+ // for Map columns, get_field performs a runtime key lookup
rather than a
+ // schema-level field access so the entire Map column must be
read,
+ // we skip the struct field optimization and defer to normal
Column traversal
+ let is_map_column = self
+ .file_schema
+ .index_of(column.name())
+ .ok()
+ .map(|idx| {
+ matches!(
+ self.file_schema.field(idx).data_type(),
+ DataType::Map(_, _)
+ )
+ })
+ .unwrap_or(false);
+
let return_type = func.return_type();
- if !DataType::is_nested(return_type)
- || self.is_nested_type_supported(return_type)
+ if !is_map_column
+ && (!DataType::is_nested(return_type)
+ || self.is_nested_type_supported(return_type))
{
// try to resolve all field name arguments to strinrg
literals
// if any argument is not a string literal, we can not
determine the exact
@@ -579,6 +595,136 @@ pub(crate) fn build_parquet_read_plan(
)))
}
+/// Builds a unified [`ParquetReadPlan`] for a set of projection expressions
+///
+/// Unlike [`build_parquet_read_plan`] (which is used for filter pushdown and
+/// returns `None` when an expression references unsupported nested types or
+/// missing columns), this function always succeeds. It collects every column
+/// that *can* be resolved in the file and produces a leaf-level projection
+/// mask. Columns missing from the file are silently skipped since the
projection
+/// layer handles those by inserting nulls.
+pub(crate) fn build_projection_read_plan(
+ exprs: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>,
+ file_schema: &Schema,
+ schema_descr: &SchemaDescriptor,
+) -> ParquetReadPlan {
+ // fast path: if every expression is a plain Column reference, skip all
+ // struct analysis and use root-level projection directly
+ let exprs = exprs.into_iter().collect::<Vec<_>>();
+ let all_plain_columns = exprs
+ .iter()
+ .all(|e| e.as_any().downcast_ref::<Column>().is_some());
+
+ if all_plain_columns {
+ let mut root_indices: Vec<usize> = exprs
+ .iter()
+ .map(|e| e.as_any().downcast_ref::<Column>().unwrap().index())
+ .collect();
+ root_indices.sort_unstable();
+ root_indices.dedup();
+
+ let projection_mask =
+ ProjectionMask::roots(schema_descr, root_indices.iter().copied());
+ let projected_schema = Arc::new(
+ file_schema
+ .project(&root_indices)
+ .expect("valid column indices"),
+ );
+
+ return ParquetReadPlan {
+ projection_mask,
+ projected_schema,
+ };
+ }
+
+ // secondary fast path: if the schema has no struct columns, we can skip
+ // PushdownChecker traversal and use root-level projection
+ let has_struct_columns = file_schema
+ .fields()
+ .iter()
+ .any(|f| matches!(f.data_type(), DataType::Struct(_)));
+
+ if !has_struct_columns {
+ let mut root_indices = exprs
+ .into_iter()
+ .flat_map(|e| collect_columns(&e).into_iter().map(|col|
col.index()))
+ .collect::<Vec<_>>();
+
+ root_indices.sort_unstable();
+ root_indices.dedup();
+
+ let projection_mask =
+ ProjectionMask::roots(schema_descr, root_indices.iter().copied());
+
+ let projected_schema = Arc::new(
+ file_schema
+ .project(&root_indices)
+ .expect("valid column indices"),
+ );
+
+ return ParquetReadPlan {
+ projection_mask,
+ projected_schema,
+ };
+ }
+
+ let mut all_root_indices = Vec::new();
+ let mut all_struct_accesses = Vec::new();
+
+ for expr in exprs {
+ let mut checker = PushdownChecker::new(file_schema, true);
+ let _ = expr.visit(&mut checker);
+ let columns = checker.into_sorted_columns();
+
+ all_root_indices.extend_from_slice(&columns.required_columns);
+ all_struct_accesses.extend(columns.struct_field_accesses);
+ }
+
+ all_root_indices.sort_unstable();
+ all_root_indices.dedup();
+
+ // when no struct field accesses were found, fall back to root-level
projection
+ // to match the performance of the simple path
+ if all_struct_accesses.is_empty() {
+ let projection_mask =
+ ProjectionMask::roots(schema_descr,
all_root_indices.iter().copied());
+ let projected_schema = Arc::new(
+ file_schema
+ .project(&all_root_indices)
+ .expect("valid column indices"),
+ );
+
+ return ParquetReadPlan {
+ projection_mask,
+ projected_schema,
+ };
+ }
+
+ let leaf_indices = {
+ let mut out =
+ leaf_indices_for_roots(all_root_indices.iter().copied(),
schema_descr);
+ let struct_leaf_indices =
+ resolve_struct_field_leaves(&all_struct_accesses, file_schema,
schema_descr);
+
+ out.extend_from_slice(&struct_leaf_indices);
+ out.sort_unstable();
+ out.dedup();
+
+ out
+ };
+
+ let projection_mask =
+ ProjectionMask::leaves(schema_descr, leaf_indices.iter().copied());
+
+ let projected_schema =
+ build_filter_schema(file_schema, &all_root_indices,
&all_struct_accesses);
+
+ ParquetReadPlan {
+ projection_mask,
+ projected_schema,
+ }
+}
+
fn leaf_indices_for_roots<I>(
root_indices: I,
schema_descr: &SchemaDescriptor,
@@ -654,6 +800,8 @@ fn build_filter_schema(
regular_indices: &[usize],
struct_field_accesses: &[StructFieldAccess],
) -> SchemaRef {
+ let regular_set: BTreeSet<usize> =
regular_indices.iter().copied().collect();
+
let all_indices = regular_indices
.iter()
.copied()
@@ -669,6 +817,15 @@ fn build_filter_schema(
.map(|&idx| {
let field = file_schema.field(idx);
+ // if this column appears as a regular (whole-column) reference,
+ // keep the full type
+ //
+ // Pruning is only valid when the column is accessed exclusively
+ // through struct field accesses
+ if regular_set.contains(&idx) {
+ return Arc::new(field.clone());
+ }
+
// collect all field paths that access this root struct column
let field_paths = struct_field_accesses
.iter()
@@ -683,7 +840,6 @@ fn build_filter_schema(
.collect::<Vec<_>>();
if field_paths.is_empty() {
- // its a regular column - use the full type
return Arc::new(field.clone());
}
@@ -696,7 +852,10 @@ fn build_filter_schema(
})
.collect::<Vec<_>>();
- Arc::new(Schema::new(fields))
+ Arc::new(Schema::new_with_metadata(
+ fields,
+ file_schema.metadata().clone(),
+ ))
}
fn prune_struct_type(dt: &DataType, paths: &[&[String]]) -> DataType {
@@ -958,6 +1117,8 @@ mod test {
use parquet::file::reader::{FileReader, SerializedFileReader};
use tempfile::NamedTempFile;
+ use datafusion_physical_expr::expressions::Column as PhysicalColumn;
+
// List predicates used by the decoder should be accepted for pushdown
#[test]
fn test_filter_candidate_builder_supports_list_types() {
@@ -1814,6 +1975,86 @@ mod test {
assert_eq!(file_metrics.pushdown_rows_matched.value(), 2);
}
+ #[test]
+ fn projection_read_plan_preserves_full_struct() {
+ // Schema: id (Int32), s (Struct{value: Int32, label: Utf8})
+ // Parquet leaves: id=0, s.value=1, s.label=2
+ let struct_fields: Fields = vec![
+ Arc::new(Field::new("value", DataType::Int32, false)),
+ Arc::new(Field::new("label", DataType::Utf8, false)),
+ ]
+ .into();
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("s", DataType::Struct(struct_fields.clone()), false),
+ ]));
+
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(StructArray::new(
+ struct_fields,
+ vec![
+ Arc::new(Int32Array::from(vec![10, 20, 30])) as _,
+ Arc::new(StringArray::from(vec!["a", "b", "c"])) as _,
+ ],
+ None,
+ )),
+ ],
+ )
+ .unwrap();
+
+ let file = NamedTempFile::new().expect("temp file");
+ let mut writer =
+ ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema),
None)
+ .expect("writer");
+ writer.write(&batch).expect("write batch");
+ writer.close().expect("close writer");
+
+ let reader_file = file.reopen().expect("reopen file");
+ let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
+ .expect("reader builder");
+ let metadata = builder.metadata().clone();
+ let file_schema = builder.schema().clone();
+ let schema_descr = metadata.file_metadata().schema_descr();
+
+ // Simulate SELECT * output projection: Column("id") and Column("s")
+ // Plus a get_field(s, 'value') expression from the pushed-down filter
+ let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
+ Arc::new(PhysicalColumn::new("id", 0)),
+ Arc::new(PhysicalColumn::new("s", 1)),
+ logical2physical(
+ &get_field().call(vec![
+ col("s"),
+
Expr::Literal(ScalarValue::Utf8(Some("value".to_string())), None),
+ ]),
+ &file_schema,
+ ),
+ ];
+
+ let read_plan = build_projection_read_plan(exprs, &file_schema,
schema_descr);
+
+ // The projected schema must have the FULL struct type because
Column("s")
+ // is in the projection. It should NOT be narrowed to Struct{value:
Int32}.
+ let s_field = read_plan.projected_schema.field_with_name("s").unwrap();
+ assert_eq!(
+ s_field.data_type(),
+ &DataType::Struct(
+ vec![
+ Arc::new(Field::new("value", DataType::Int32, false)),
+ Arc::new(Field::new("label", DataType::Utf8, false)),
+ ]
+ .into()
+ ),
+ );
+
+ // all3 Parquet leaves should be in the projection mask
+ let expected_mask = ProjectionMask::leaves(schema_descr, [0, 1, 2]);
+ assert_eq!(read_plan.projection_mask, expected_mask,);
+ }
+
/// Sanity check that the given expression could be evaluated against the
given schema without any errors.
/// This will fail if the expression references columns that are not in
the schema or if the types of the columns are incompatible, etc.
fn check_expression_can_evaluate_against_schema(
diff --git a/datafusion/sqllogictest/test_files/projection_pushdown.slt
b/datafusion/sqllogictest/test_files/projection_pushdown.slt
index 1735b1fb41..777f1e00ed 100644
--- a/datafusion/sqllogictest/test_files/projection_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/projection_pushdown.slt
@@ -1994,9 +1994,83 @@ WHERE COALESCE(get_field(s, 'f1'), get_field(s, 'f2')) =
1;
----
1
+#####################
+# Section 8: SELECT * with struct field filter
+#####################
+
+# When SELECT * includes the full struct but the filter only accesses a
+# sub-field (e.g. s['id']), the leaf-level projection must not narrow the
+# struct schema in the output. Previously build_projection_read_plan would
+# produce a schema with Struct("id": Int32) while the data still contained
+# Struct("id": Int32, "value": Utf8), causing an ArrowError.
+
+# 8.1: SELECT * with equality filter on struct sub-field
+query I?
+SELECT * FROM simple_struct WHERE s['value'] = 100;
+----
+1 {value: 100, label: alpha}
+
+# 8.2: Explicit SELECT of whole struct with struct sub-field filter
+query ?
+SELECT s FROM simple_struct WHERE s['value'] = 100;
+----
+{value: 100, label: alpha}
+
+# 8.3: Whole struct + sub-field projection + sub-field filter
+query I?I
+SELECT s['value'], s, id FROM simple_struct WHERE s['value'] = 100;
+----
+100 {value: 100, label: alpha} 1
+
+# 8.4: Whole struct in output, filter on a different sub-field than projected
+query ?T
+SELECT s, s['label'] FROM simple_struct WHERE s['value'] > 200;
+----
+{value: 300, label: delta} delta
+{value: 250, label: epsilon} epsilon
+
+# 8.5: Filter references both sub-fields, output includes whole struct
+query I?
+SELECT id, s FROM simple_struct WHERE s['value'] > 100 AND s['label'] = 'beta';
+----
+2 {value: 200, label: beta}
+
+# 8.6: Only sub-field projection with sub-field filter (no whole struct —
should prune)
+query II
+SELECT id, s['value'] FROM simple_struct WHERE s['value'] = 100;
+----
+1 100
+
+# 8.7: Nested struct — whole struct output with deeply nested field filter
+query I?
+SELECT * FROM nested_struct WHERE nested['outer']['inner'] > 15;
+----
+2 {outer: {inner: 20, name: two}, extra: y}
+3 {outer: {inner: 30, name: three}, extra: z}
+
+# 8.8: Nested struct — explicit whole struct select with sibling field filter
+query ?
+SELECT nested FROM nested_struct WHERE nested['extra'] = 'y';
+----
+{outer: {inner: 20, name: two}, extra: y}
+
+# 8.9: Nullable struct — whole struct output with sub-field filter
+query ?
+SELECT s FROM nullable_struct WHERE s['value'] > 100;
+----
+{value: 150, label: gamma}
+{value: 250, label: epsilon}
+
+# 8.10: Struct sub-field filter combined with top-level column filter
+query ?I
+SELECT s, id FROM simple_struct WHERE s['value'] > 100 AND id < 4;
+----
+{value: 200, label: beta} 2
+{value: 150, label: gamma} 3
+
# Config reset
-# The SLT runner sets `target_partitions` to 4 instead of using the default,
so
+# The SLT runner sets `target_partitions` to 4 instead of using the default, so
# reset it explicitly.
statement ok
SET datafusion.execution.target_partitions = 4;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]