alamb commented on code in PR #19538:
URL: https://github.com/apache/datafusion/pull/19538#discussion_r2720951671
##########
datafusion/core/tests/physical_optimizer/projection_pushdown.rs:
##########
@@ -1723,3 +1729,87 @@ fn test_cooperative_exec_after_projection() ->
Result<()> {
Ok(())
}
+
+#[test]
+fn test_pushdown_projection_through_repartition_filter() {
+ let struct_fields = Fields::from(vec![Field::new("a", DataType::Int32,
false)]);
+ let array = StructArray::new(
+ struct_fields.clone(),
+ vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))],
+ None,
+ );
+ let batches = vec![
+ RecordBatch::try_new(
+ Arc::new(Schema::new(vec![Field::new(
Review Comment:
I think one thing that would make the PR easier to review would be to reduce
some duplication -- for example, the creation of the schema is created twice
here, so it takes some cognative load to figure out what, if anything, is
different between the two schemas (nothing I don't think)
Maybe defining the schema above like
```rust
let struct_fields = Fields::from(vec![Field::new("a", DataType::Int32,
false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"struct",
DataType::Struct(struct_fields),
true,
)]));
```
Would make it eaiser to follow
##########
datafusion/expr/src/udf.rs:
##########
@@ -846,6 +858,32 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send +
Sync {
fn documentation(&self) -> Option<&Documentation> {
None
}
+
+ /// Returns the triviality classification of this function given its
arguments' triviality.
Review Comment:
I wonder if we could just pass the actual `Expr` arguments to `triviality`
-- that would give the functions the full information necessary to make the
decision on their own triviality
##########
datafusion/physical-expr/src/projection.rs:
##########
@@ -806,17 +807,65 @@ pub fn update_expr(
RewrittenInvalid,
}
+ // Track Arc pointers of columns created by pass 1.
+ // These should not be modified by pass 2.
+ // We use Arc pointer addresses (not name/index) to distinguish
pass-1-created columns
+ // from original columns that happen to have the same name and index.
+ let mut pass1_created: HashSet<usize> = HashSet::new();
+
+ // First pass: try to rewrite the expression in terms of the projected
expressions.
+ // For example, if the expression is `a + b > 5` and the projection is `a
+ b AS sum_ab`,
+ // we can rewrite the expression to `sum_ab > 5` directly.
+ //
+ // This optimization only applies when sync_with_child=false, meaning we
want the
+ // expression to use OUTPUT references (e.g., when pushing projection down
and the
+ // expression will be above the projection). Pass 1 creates OUTPUT column
references.
+ //
+ // When sync_with_child=true, we want INPUT references (expanding OUTPUT
to INPUT),
Review Comment:
this comment is super helpful -- maybe we can rename the argument to
"above_projection" rather than sync_with_child 🤔 (or a follow on PR)
##########
datafusion/physical-optimizer/src/projection_pushdown.rs:
##########
@@ -88,6 +98,267 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
}
}
+/// Tries to split a projection to extract beneficial sub-expressions for
pushdown.
+///
+/// This function walks each expression in the projection and extracts
beneficial
+/// sub-expressions (like `get_field`) from within larger non-beneficial
expressions.
+/// For example:
+/// - Input: `get_field(col, 'foo') + 1`
+/// - Output: Inner projection: `get_field(col, 'foo') AS __extracted_0`,
Outer: `__extracted_0 + 1`
+///
+/// This enables the beneficial parts to be pushed down while keeping
non-beneficial
+/// expressions (like literals and computations) above.
+fn try_split_projection(
+ plan: Arc<dyn ExecutionPlan>,
+ alias_generator: &AliasGenerator,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+ let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() else
{
+ return Ok(Transformed::no(plan));
+ };
+
+ let input_schema = projection.input().schema();
+ let mut extractor = TrivialExprExtractor::new(input_schema.as_ref(),
alias_generator);
+
+ // Extract trivial sub-expressions from each projection expression
+ let mut outer_exprs = Vec::new();
+ let mut has_extractions = false;
+
+ for proj_expr in projection.expr() {
+ // If this is already an expression from an extraction don't try to
re-extract it (would cause infinite recursion)
+ if proj_expr.alias.starts_with("__extracted") {
+ outer_exprs.push(proj_expr.clone());
+ continue;
+ }
+
+ // Only extract from non-trivial expressions. If the entire expression
is
+ // already TrivialExpr (like `get_field(col, 'foo')`), it can be
pushed as-is.
+ // We only need to split when there's a non-trivial expression with
trivial
+ // sub-expressions (like `get_field(col, 'foo') + 1`).
+ if matches!(proj_expr.expr.triviality(), ArgTriviality::TrivialExpr) {
+ outer_exprs.push(proj_expr.clone());
+ continue;
+ }
+
+ let rewritten = extractor.extract(Arc::clone(&proj_expr.expr))?;
+ if !Arc::ptr_eq(&rewritten, &proj_expr.expr) {
+ has_extractions = true;
+ }
+ outer_exprs.push(ProjectionExpr::new(rewritten,
proj_expr.alias.clone()));
+ }
+
+ if !has_extractions {
+ return Ok(Transformed::no(plan));
+ }
+
+ // Collect columns needed by outer expressions that aren't extracted
+ extractor.collect_columns_needed(&outer_exprs)?;
+
+ // Build inner projection from extracted expressions + needed columns
+ let inner_exprs = extractor.build_inner_projection()?;
+
+ if inner_exprs.is_empty() {
+ return Ok(Transformed::no(plan));
+ }
+
+ // Create the inner projection (to be pushed down)
+ let inner = ProjectionExec::try_new(inner_exprs,
Arc::clone(projection.input()))?;
+
+ // Rewrite outer expressions to reference the inner projection's output
schema
+ let inner_schema = inner.schema();
+ let final_outer_exprs = extractor.finalize_outer_exprs(outer_exprs,
&inner_schema)?;
+
+ // Create the outer projection (stays above)
+ let outer = ProjectionExec::try_new(final_outer_exprs, Arc::new(inner))?;
+
+ Ok(Transformed::yes(Arc::new(outer)))
+}
+
+/// Extracts beneficial trivial sub-expressions from larger expressions.
+///
+/// Similar to `JoinFilterRewriter`, this struct walks expression trees
top-down
+/// and extracts sub-expressions where `triviality() ==
ArgTriviality::TrivialExpr`
+/// (beneficial trivial expressions like field accessors).
+///
+/// The extracted expressions are replaced with column references pointing to
+/// an inner projection that computes these sub-expressions.
+struct TrivialExprExtractor<'a> {
+ /// Extracted trivial expressions: maps expression -> alias
+ extracted: IndexMap<Arc<dyn PhysicalExpr>, String>,
+ /// Columns needed by outer expressions: maps input column index -> alias
+ columns_needed: IndexMap<usize, String>,
+ /// Input schema for the projection
+ input_schema: &'a Schema,
+ /// Alias generator for unique names
+ alias_generator: &'a AliasGenerator,
+}
+
+impl<'a> TrivialExprExtractor<'a> {
+ fn new(input_schema: &'a Schema, alias_generator: &'a AliasGenerator) ->
Self {
+ Self {
+ extracted: IndexMap::new(),
+ columns_needed: IndexMap::new(),
+ input_schema,
+ alias_generator,
+ }
+ }
+
+ /// Extracts beneficial trivial sub-expressions from the given expression.
+ ///
+ /// Walks the expression tree top-down and replaces beneficial trivial
+ /// sub-expressions with column references to the inner projection.
+ fn extract(&mut self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn
PhysicalExpr>> {
Review Comment:
Is there any reason this doesn't use one of the tree node APIs? (like
`apply` for example?)
https://docs.rs/datafusion/latest/datafusion/common/tree_node/trait.TreeNode.html
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -192,6 +193,73 @@ impl ProjectionExec {
input.boundedness(),
))
}
+
+ /// Returns true if the projection is beneficial to push through most
operators.
+ ///
+ /// Some cases to consider:
+ ///
+ /// - Sorts: sorts do expensive work (re-arranging rows) and thus benefit
from
+ /// having as little data underneath them as possible.
+ /// We don't want to push down expressions such as literals that could be
+ /// evaluated after the sort.
+ /// Truthfully we probably would want to push down some complex
expressions if
+ /// they reduce the amount of data (e.g. `a like '%foo%'` converts a
large string
+ /// column into a boolean column) but we currently don't have any good
way to estimate that.
+ /// - RepartitionExec / CoalesceBatchesExec: these operators change the
parallelism
+ /// or batch sizes and are designed to optimize CPU work for operators
above them.
+ /// Pushing down expensive expressions past them defeats their purpose
so we want to avoid that.
+ /// - Filters: filters can reduce the amount of data processed by upstream
operators,
+ /// so pushing down expensive computation under them would result in
that computation being
+ /// applied to more rows.
+ /// Again if we knew that `a like '%foo%'` reduces the projection size
significantly
+ /// and the filter is not selective we actually might want to push it
down, but we don't have
+ /// a good way to estimate that currently.
+ /// - Joins: joins both benefit from having less data under them (they may
have to select sparse rows)
+ /// but they also serve as filters.
+ ///
+ /// Obviously given the information we have currently, we cannot make
perfect decisions here.
+ /// Our approach is to stick to the obvious cases:
+ ///
+ /// - If the projection narrows the schema (drops columns) and is only
column references it
+ /// always makes sense to push it down.
+ /// - If the projection contains any trivial expression (which can reduce
the data size
+ /// of the projection significantly at a very low computational cost)
and does not contain
+ /// any computationally expensive expressions, we also consider it
beneficial to push down.
+ ///
+ /// In all other cases we consider the projection not beneficial to push
down.
+ ///
+ /// This is true when:
+ /// - The projection narrows the schema (drops columns) - saves memory, OR
+ /// - Any expression is a TrivialExpr (like get_field) - beneficial
computation pushdown
+ ///
+ /// Pure Column references that don't narrow the schema are NOT beneficial
to push,
+ /// as they just rearrange the plan without any gain.
+ ///
+ /// Note: Projections are split by `try_split_projection` before reaching
this function,
+ /// so if any expression is TrivialExpr, all expressions should be trivial.
+ pub fn is_trivial_or_narrows_schema(&self) -> bool {
Review Comment:
It might make the use clearer we could change this name to explain what it
is used for (like `should_push_down` for example)?
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -192,6 +193,73 @@ impl ProjectionExec {
input.boundedness(),
))
}
+
+ /// Returns true if the projection is beneficial to push through most
operators.
+ ///
+ /// Some cases to consider:
+ ///
+ /// - Sorts: sorts do expensive work (re-arranging rows) and thus benefit
from
+ /// having as little data underneath them as possible.
+ /// We don't want to push down expressions such as literals that could be
+ /// evaluated after the sort.
+ /// Truthfully we probably would want to push down some complex
expressions if
+ /// they reduce the amount of data (e.g. `a like '%foo%'` converts a
large string
+ /// column into a boolean column) but we currently don't have any good
way to estimate that.
+ /// - RepartitionExec / CoalesceBatchesExec: these operators change the
parallelism
+ /// or batch sizes and are designed to optimize CPU work for operators
above them.
+ /// Pushing down expensive expressions past them defeats their purpose
so we want to avoid that.
+ /// - Filters: filters can reduce the amount of data processed by upstream
operators,
+ /// so pushing down expensive computation under them would result in
that computation being
+ /// applied to more rows.
+ /// Again if we knew that `a like '%foo%'` reduces the projection size
significantly
+ /// and the filter is not selective we actually might want to push it
down, but we don't have
+ /// a good way to estimate that currently.
+ /// - Joins: joins both benefit from having less data under them (they may
have to select sparse rows)
+ /// but they also serve as filters.
Review Comment:
```suggestion
/// but they also typically serve as filters.
```
##########
datafusion/core/tests/physical_optimizer/projection_pushdown.rs:
##########
@@ -1723,3 +1729,87 @@ fn test_cooperative_exec_after_projection() ->
Result<()> {
Ok(())
}
+
+#[test]
+fn test_pushdown_projection_through_repartition_filter() {
+ let struct_fields = Fields::from(vec![Field::new("a", DataType::Int32,
false)]);
+ let array = StructArray::new(
+ struct_fields.clone(),
+ vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))],
+ None,
+ );
+ let batches = vec![
+ RecordBatch::try_new(
+ Arc::new(Schema::new(vec![Field::new(
+ "struct",
+ DataType::Struct(struct_fields.clone()),
+ true,
+ )])),
+ vec![Arc::new(array)],
+ )
+ .unwrap(),
+ ];
+ let build_side_schema = Arc::new(Schema::new(vec![Field::new(
+ "struct",
+ DataType::Struct(struct_fields),
+ true,
+ )]));
+
+ let scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+ .with_support(true)
+ .with_batches(batches)
+ .build();
+ let scan_schema = scan.schema();
Review Comment:
this is also the same schema, right?
##########
datafusion/sqllogictest/test_files/projection_pushdown.slt:
##########
@@ -0,0 +1,1001 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for projection pushdown behavior with get_field expressions
+#
+# This file tests the ExtractTrivialProjections optimizer rule and
+# physical projection pushdown for:
+# - get_field expressions (struct field access like s['foo'])
+# - Pushdown through Filter, Sort, and TopK operators
+# - Multi-partition scenarios with SortPreservingMergeExec
+##########
+
+#####################
+# Section 1: Setup - Single Partition Tests
+#####################
+
+# Set target_partitions = 1 for deterministic plan output
+statement ok
+SET datafusion.execution.target_partitions = 1;
+
+# Create parquet file with struct column containing value and label fields
+statement ok
+COPY (
+ SELECT
+ column1 as id,
+ column2 as s
+ FROM VALUES
+ (1, {value: 100, label: 'alpha'}),
+ (2, {value: 200, label: 'beta'}),
+ (3, {value: 150, label: 'gamma'}),
+ (4, {value: 300, label: 'delta'}),
+ (5, {value: 250, label: 'epsilon'})
+) TO 'test_files/scratch/projection_pushdown/simple.parquet'
+STORED AS PARQUET;
+
+# Create table for simple struct tests
+statement ok
+CREATE EXTERNAL TABLE simple_struct STORED AS PARQUET
+LOCATION 'test_files/scratch/projection_pushdown/simple.parquet';
+
+# Create parquet file with nested struct column
+statement ok
+COPY (
+ SELECT
+ column1 as id,
+ column2 as nested
+ FROM VALUES
+ (1, {outer: {inner: 10, name: 'one'}, extra: 'x'}),
+ (2, {outer: {inner: 20, name: 'two'}, extra: 'y'}),
+ (3, {outer: {inner: 30, name: 'three'}, extra: 'z'})
+) TO 'test_files/scratch/projection_pushdown/nested.parquet'
+STORED AS PARQUET;
+
+# Create table for nested struct tests
+statement ok
+CREATE EXTERNAL TABLE nested_struct STORED AS PARQUET
+LOCATION 'test_files/scratch/projection_pushdown/nested.parquet';
+
+# Create parquet file with nullable struct column
+statement ok
+COPY (
+ SELECT
+ column1 as id,
+ column2 as s
+ FROM VALUES
+ (1, {value: 100, label: 'alpha'}),
+ (2, NULL),
+ (3, {value: 150, label: 'gamma'}),
+ (4, NULL),
+ (5, {value: 250, label: 'epsilon'})
+) TO 'test_files/scratch/projection_pushdown/nullable.parquet'
+STORED AS PARQUET;
+
+# Create table for nullable struct tests
+statement ok
+CREATE EXTERNAL TABLE nullable_struct STORED AS PARQUET
+LOCATION 'test_files/scratch/projection_pushdown/nullable.parquet';
+
+
+#####################
+# Section 2: Basic get_field Pushdown (Projection above scan)
+#####################
+
+###
+# Test 2.1: Simple s['value'] - pushed into DataSourceExec
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+02)--TableScan: simple_struct projection=[id, s]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct ORDER BY id;
+----
+1 100
+2 200
+3 150
+4 300
+5 250
+
+###
+# Test 2.2: Multiple get_field expressions - all pushed
+###
+
+query TT
+EXPLAIN SELECT id, s['value'], s['label'] FROM simple_struct;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")),
get_field(simple_struct.s, Utf8("label"))
+02)--TableScan: simple_struct projection=[id, s]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value],
get_field(s@1, label) as simple_struct.s[label]], file_type=parquet
+
+# Verify correctness
+query IIT
+SELECT id, s['value'], s['label'] FROM simple_struct ORDER BY id;
+----
+1 100 alpha
+2 200 beta
+3 150 gamma
+4 300 delta
+5 250 epsilon
+
+###
+# Test 2.3: Nested s['outer']['inner'] - pushed
+###
+
+query TT
+EXPLAIN SELECT id, nested['outer']['inner'] FROM nested_struct;
+----
+logical_plan
+01)Projection: nested_struct.id, get_field(nested_struct.nested,
Utf8("outer"), Utf8("inner"))
+02)--TableScan: nested_struct projection=[id, nested]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/nested.parquet]]},
projection=[id, get_field(nested@1, outer, inner) as
nested_struct.nested[outer][inner]], file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, nested['outer']['inner'] FROM nested_struct ORDER BY id;
+----
+1 10
+2 20
+3 30
+
+###
+# Test 2.4: s['value'] + 1 - entire expression pushed (directly above scan)
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] + 1 FROM simple_struct;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1)
+02)--TableScan: simple_struct projection=[id, s]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) + 1 as simple_struct.s[value] +
Int64(1)], file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, s['value'] + 1 FROM simple_struct ORDER BY id;
+----
+1 101
+2 201
+3 151
+4 301
+5 251
+
+###
+# Test 2.5: s['label'] || '_suffix' - pushed (directly above scan)
+###
+
+query TT
+EXPLAIN SELECT id, s['label'] || '_suffix' FROM simple_struct;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("label")) ||
Utf8("_suffix")
+02)--TableScan: simple_struct projection=[id, s]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, label) || _suffix as simple_struct.s[label] ||
Utf8("_suffix")], file_type=parquet
+
+# Verify correctness
+query IT
+SELECT id, s['label'] || '_suffix' FROM simple_struct ORDER BY id;
+----
+1 alpha_suffix
+2 beta_suffix
+3 gamma_suffix
+4 delta_suffix
+5 epsilon_suffix
+
+
+#####################
+# Section 3: Projection Through Filter
+#####################
+
+###
+# Test 3.1: Simple get_field through Filter - pushed
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct WHERE id > 2;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+02)--Filter: simple_struct.id > Int64(2)
+03)----TableScan: simple_struct projection=[id, s],
partial_filters=[simple_struct.id > Int64(2)]
+physical_plan
+01)FilterExec: id@0 > 2
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet, predicate=id@0 > 2, pruning_predicate=id_null_count@1 !=
row_count@2 AND id_max@0 > 2, required_guarantees=[]
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct WHERE id > 2 ORDER BY id;
+----
+3 150
+4 300
+5 250
+
+###
+# Test 3.2: s['value'] + 1 through Filter - get_field extracted and pushed
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] + 1 FROM simple_struct WHERE id > 2;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1)
+02)--Filter: simple_struct.id > Int64(2)
+03)----TableScan: simple_struct projection=[id, s],
partial_filters=[simple_struct.id > Int64(2)]
+physical_plan
+01)ProjectionExec: expr=[id@1 as id, __extracted_1@0 + 1 as
simple_struct.s[value] + Int64(1)]
+02)--FilterExec: id@1 > 2
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __extracted_1, id], file_type=parquet,
predicate=id@0 > 2, pruning_predicate=id_null_count@1 != row_count@2 AND
id_max@0 > 2, required_guarantees=[]
+
+# Verify correctness
+query II
+SELECT id, s['value'] + 1 FROM simple_struct WHERE id > 2 ORDER BY id;
+----
+3 151
+4 301
+5 251
+
+###
+# Test 3.3: Filter on get_field expression
+###
+
+query TT
+EXPLAIN SELECT id, s['label'] FROM simple_struct WHERE s['value'] > 150;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("label"))
+02)--Filter: get_field(simple_struct.s, Utf8("value")) > Int64(150)
+03)----TableScan: simple_struct projection=[id, s],
partial_filters=[get_field(simple_struct.s, Utf8("value")) > Int64(150)]
+physical_plan
+01)ProjectionExec: expr=[id@0 as id, get_field(s@1, label) as
simple_struct.s[label]]
+02)--FilterExec: get_field(s@1, value) > 150
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, s], file_type=parquet
+
+# Verify correctness
+query IT
+SELECT id, s['label'] FROM simple_struct WHERE s['value'] > 150 ORDER BY id;
+----
+2 beta
+4 delta
+5 epsilon
+
+
+#####################
+# Section 4: Projection Through Sort (no LIMIT)
+#####################
+
+###
+# Test 4.1: Simple get_field through Sort - pushed
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct ORDER BY id;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct ORDER BY id;
+----
+1 100
+2 200
+3 150
+4 300
+5 250
+
+###
+# Test 4.2: s['value'] + 1 through Sort - split projection
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] + 1 FROM simple_struct ORDER BY id;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1)
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) + 1 as simple_struct.s[value] +
Int64(1)], file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, s['value'] + 1 FROM simple_struct ORDER BY id;
+----
+1 101
+2 201
+3 151
+4 301
+5 251
+
+###
+# Test 4.3: Sort by get_field expression
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct ORDER BY s['value'];
+----
+logical_plan
+01)Sort: simple_struct.s[value] ASC NULLS LAST
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: expr=[simple_struct.s[value]@1 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct ORDER BY s['value'];
+----
+1 100
+3 150
+2 200
+5 250
+4 300
+
+
+#####################
+# Section 5: Projection Through TopK (ORDER BY + LIMIT)
+#####################
+
+###
+# Test 5.1: Simple get_field through TopK - pushed (trivial)
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct ORDER BY id LIMIT 3;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST, fetch=3
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct ORDER BY id LIMIT 3;
+----
+1 100
+2 200
+3 150
+
+###
+# Test 5.2: s['value'] + 1 through TopK - pushed (narrows schema from 2 to 2
cols)
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] + 1 FROM simple_struct ORDER BY id LIMIT 3;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST, fetch=3
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1)
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) + 1 as simple_struct.s[value] +
Int64(1)], file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# Verify correctness
+query II
+SELECT id, s['value'] + 1 FROM simple_struct ORDER BY id LIMIT 3;
+----
+1 101
+2 201
+3 151
+
+###
+# Test 5.3: Multiple get_field through TopK - all pushed
+###
+
+query TT
+EXPLAIN SELECT id, s['value'], s['label'] FROM simple_struct ORDER BY id LIMIT
3;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST, fetch=3
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")),
get_field(simple_struct.s, Utf8("label"))
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value],
get_field(s@1, label) as simple_struct.s[label]], file_type=parquet,
predicate=DynamicFilter [ empty ]
+
+# Verify correctness
+query IIT
+SELECT id, s['value'], s['label'] FROM simple_struct ORDER BY id LIMIT 3;
+----
+1 100 alpha
+2 200 beta
+3 150 gamma
+
+###
+# Test 5.4: Nested get_field through TopK - pushed
+###
+
+query TT
+EXPLAIN SELECT id, nested['outer']['inner'] FROM nested_struct ORDER BY id
LIMIT 2;
+----
+logical_plan
+01)Sort: nested_struct.id ASC NULLS LAST, fetch=2
+02)--Projection: nested_struct.id, get_field(nested_struct.nested,
Utf8("outer"), Utf8("inner"))
+03)----TableScan: nested_struct projection=[id, nested]
+physical_plan
+01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/nested.parquet]]},
projection=[id, get_field(nested@1, outer, inner) as
nested_struct.nested[outer][inner]], file_type=parquet, predicate=DynamicFilter
[ empty ]
+
+# Verify correctness
+query II
+SELECT id, nested['outer']['inner'] FROM nested_struct ORDER BY id LIMIT 2;
+----
+1 10
+2 20
+
+###
+# Test 5.5: String concat through TopK - pushed (narrows schema)
+###
+
+query TT
+EXPLAIN SELECT id, s['label'] || '_suffix' FROM simple_struct ORDER BY id
LIMIT 3;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST, fetch=3
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("label"))
|| Utf8("_suffix")
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, label) || _suffix as simple_struct.s[label] ||
Utf8("_suffix")], file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# Verify correctness
+query IT
+SELECT id, s['label'] || '_suffix' FROM simple_struct ORDER BY id LIMIT 3;
+----
+1 alpha_suffix
+2 beta_suffix
+3 gamma_suffix
+
+
+#####################
+# Section 6: Combined Operators
+#####################
+
+###
+# Test 6.1: Filter + Sort + get_field
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct WHERE id > 1 ORDER BY
s['value'];
+----
+logical_plan
+01)Sort: simple_struct.s[value] ASC NULLS LAST
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+03)----Filter: simple_struct.id > Int64(1)
+04)------TableScan: simple_struct projection=[id, s],
partial_filters=[simple_struct.id > Int64(1)]
+physical_plan
+01)SortExec: expr=[simple_struct.s[value]@1 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--FilterExec: id@0 > 1
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet, predicate=id@0 > 1, pruning_predicate=id_null_count@1 !=
row_count@2 AND id_max@0 > 1, required_guarantees=[]
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct WHERE id > 1 ORDER BY s['value'];
+----
+3 150
+2 200
+5 250
+4 300
+
+###
+# Test 6.2: Filter + TopK + get_field
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct WHERE id > 1 ORDER BY
s['value'] LIMIT 2;
+----
+logical_plan
+01)Sort: simple_struct.s[value] ASC NULLS LAST, fetch=2
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+03)----Filter: simple_struct.id > Int64(1)
+04)------TableScan: simple_struct projection=[id, s],
partial_filters=[simple_struct.id > Int64(1)]
+physical_plan
+01)SortExec: TopK(fetch=2), expr=[simple_struct.s[value]@1 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--FilterExec: id@0 > 1
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet, predicate=id@0 > 1, pruning_predicate=id_null_count@1 !=
row_count@2 AND id_max@0 > 1, required_guarantees=[]
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct WHERE id > 1 ORDER BY s['value']
LIMIT 2;
+----
+3 150
+2 200
+
+###
+# Test 6.3: Filter + TopK + get_field with arithmetic
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] + 1 FROM simple_struct WHERE id > 1 ORDER BY id
LIMIT 2;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST, fetch=2
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1)
+03)----Filter: simple_struct.id > Int64(1)
+04)------TableScan: simple_struct projection=[id, s],
partial_filters=[simple_struct.id > Int64(1)]
+physical_plan
+01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--ProjectionExec: expr=[id@1 as id, __extracted_1@0 + 1 as
simple_struct.s[value] + Int64(1)]
+03)----FilterExec: id@1 > 1
+04)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __extracted_1, id], file_type=parquet,
predicate=id@0 > 1 AND DynamicFilter [ empty ],
pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1,
required_guarantees=[]
+
+# Verify correctness
+query II
+SELECT id, s['value'] + 1 FROM simple_struct WHERE id > 1 ORDER BY id LIMIT 2;
+----
+2 201
+3 151
+
+
+#####################
+# Section 7: Multi-Partition Tests
+#####################
+
+# Set target_partitions = 4 for parallel execution
+statement ok
+SET datafusion.execution.target_partitions = 4;
+
+# Create 5 parquet files (more than partitions) for parallel tests
+statement ok
+COPY (SELECT 1 as id, {value: 100, label: 'alpha'} as s)
+TO 'test_files/scratch/projection_pushdown/multi/part1.parquet'
+STORED AS PARQUET;
+
+statement ok
+COPY (SELECT 2 as id, {value: 200, label: 'beta'} as s)
+TO 'test_files/scratch/projection_pushdown/multi/part2.parquet'
+STORED AS PARQUET;
+
+statement ok
+COPY (SELECT 3 as id, {value: 150, label: 'gamma'} as s)
+TO 'test_files/scratch/projection_pushdown/multi/part3.parquet'
+STORED AS PARQUET;
+
+statement ok
+COPY (SELECT 4 as id, {value: 300, label: 'delta'} as s)
+TO 'test_files/scratch/projection_pushdown/multi/part4.parquet'
+STORED AS PARQUET;
+
+statement ok
+COPY (SELECT 5 as id, {value: 250, label: 'epsilon'} as s)
+TO 'test_files/scratch/projection_pushdown/multi/part5.parquet'
+STORED AS PARQUET;
+
+# Create table from multiple parquet files
+statement ok
+CREATE EXTERNAL TABLE multi_struct STORED AS PARQUET
+LOCATION 'test_files/scratch/projection_pushdown/multi/';
+
+###
+# Test 7.1: Multi-partition Sort with get_field
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM multi_struct ORDER BY id;
Review Comment:
can you please also add a literal expression as well (like `'Foo'` for
example) to show it does not get pushed into the scan?
##########
datafusion/sqllogictest/test_files/unnest.slt:
##########
@@ -673,8 +673,8 @@ logical_plan
physical_plan
01)ProjectionExec:
expr=[__unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1],depth=2)@0
as UNNEST(UNNEST(UNNEST(recursive_unnest_table.column3)[c1])), column3@1 as
column3]
02)--UnnestExec
-03)----ProjectionExec:
expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0,
c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]),
column3@1 as column3]
-04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------ProjectionExec:
expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0,
c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]),
column3@1 as column3]
Review Comment:
this seems like a good change -- the fields are extracted prior to
RepartitionExec
##########
datafusion/physical-optimizer/src/projection_pushdown.rs:
##########
@@ -88,6 +98,267 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
}
}
+/// Tries to split a projection to extract beneficial sub-expressions for
pushdown.
+///
+/// This function walks each expression in the projection and extracts
beneficial
+/// sub-expressions (like `get_field`) from within larger non-beneficial
expressions.
+/// For example:
+/// - Input: `get_field(col, 'foo') + 1`
+/// - Output: Inner projection: `get_field(col, 'foo') AS __extracted_0`,
Outer: `__extracted_0 + 1`
+///
+/// This enables the beneficial parts to be pushed down while keeping
non-beneficial
+/// expressions (like literals and computations) above.
+fn try_split_projection(
Review Comment:
I couldn't help but think this is very similar to common sub expression
elimination rewrite (but the conditions of what sub expression are rewritten
are different)
Maybe you could make this more discoverable by abstracting it away into a
structure like "SubExpressionRewriter" that does the mechanics of rewriting
expressions, and is parameterized by some function that decides to do the
rewrite or not
it might also be worth adding a diagram showing the plan nodes that are
created as part of this rewrite)
```
(projection get_field(col, 'foo') + 1)
(rest of plan)
```
To
```
(projection __extracted_0 + 1)
(rest of plan, produces __extracted_0)
```
or something 🤔
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1123,15 +1123,8 @@ impl ExecutionPlan for RepartitionExec {
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
- // If the projection does not narrow the schema, we should not try to
push it down.
- if projection.expr().len() >=
projection.input().schema().fields().len() {
- return Ok(None);
- }
-
// If pushdown is not beneficial or applicable, break it.
- if projection.benefits_from_input_partitioning()[0]
- || !all_columns(projection.expr())
- {
+ if projection.benefits_from_input_partitioning()[0] {
Review Comment:
It seems like the idea was that only reductions in columns would be
beneficial to push down -- because otherwise you might push an expensive
computation into a single partition part of the plan (e.g. a scan from a single
file) rather than allowing it to be done in parallel
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1123,15 +1123,8 @@ impl ExecutionPlan for RepartitionExec {
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
- // If the projection does not narrow the schema, we should not try to
push it down.
- if projection.expr().len() >=
projection.input().schema().fields().len() {
- return Ok(None);
- }
-
// If pushdown is not beneficial or applicable, break it.
- if projection.benefits_from_input_partitioning()[0]
- || !all_columns(projection.expr())
- {
+ if projection.benefits_from_input_partitioning()[0] {
Review Comment:
It would make sense (and be consistent with the rest of the changes in this
PR)
##########
datafusion/expr-common/src/triviality.rs:
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Triviality classification for expressions and function arguments.
+
+/// Classification of argument triviality for scalar functions.
+///
+/// This enum is used by [`ScalarUDFImpl::triviality`] to allow
+/// functions to make context-dependent decisions about whether they are
+/// trivial based on the nature of their arguments.
Review Comment:
Maybe we can also mention "trivial expressions" are pushed down by the
optimizer and may be evaluated more times than the original expression.
For column references or field access, this is is not slower, but for other
expressions it may be
##########
datafusion/core/tests/physical_optimizer/projection_pushdown.rs:
##########
@@ -1723,3 +1729,87 @@ fn test_cooperative_exec_after_projection() ->
Result<()> {
Ok(())
}
+
+#[test]
+fn test_pushdown_projection_through_repartition_filter() {
+ let struct_fields = Fields::from(vec![Field::new("a", DataType::Int32,
false)]);
+ let array = StructArray::new(
+ struct_fields.clone(),
+ vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))],
+ None,
+ );
+ let batches = vec![
+ RecordBatch::try_new(
+ Arc::new(Schema::new(vec![Field::new(
+ "struct",
+ DataType::Struct(struct_fields.clone()),
+ true,
+ )])),
+ vec![Arc::new(array)],
+ )
+ .unwrap(),
+ ];
+ let build_side_schema = Arc::new(Schema::new(vec![Field::new(
Review Comment:
what is the meaning of "build side" ? I don't think this test has a join in
it
##########
datafusion/physical-expr/src/projection.rs:
##########
@@ -2425,6 +2474,291 @@ pub(crate) mod tests {
Ok(())
}
+ #[test]
+ fn test_update_expr_matches_projected_expr() -> Result<()> {
+ // Test that when filter expression exactly matches a projected
expression,
+ // update_expr short-circuits and rewrites to use the projected column.
+ // e.g., projection: a * 2 AS a_times_2, filter: a * 2 > 4
+ // should become: a_times_2 > 4
+
+ // Create the computed expression: a@0 * 2
+ let computed_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 0)),
+ Operator::Multiply,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
+ ));
+
+ // Create projection with the computed expression aliased as
"a_times_2"
+ let projection = vec![ProjectionExpr {
+ expr: Arc::clone(&computed_expr),
+ alias: "a_times_2".to_string(),
+ }];
+
+ // Create filter predicate: a * 2 > 4 (same expression as projection)
+ let filter_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
+ Arc::clone(&computed_expr),
+ Operator::Gt,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(4)))),
+ ));
+
+ // Update the expression - should rewrite a * 2 to a_times_2@0
+ // sync_with_child=false because we want OUTPUT references (filter
will be above projection)
+ let result = update_expr(&filter_predicate, &projection, false)?;
+ assert!(result.is_some(), "Filter predicate should be valid");
+
+ let result_expr = result.unwrap();
+ let binary = result_expr
Review Comment:
Rather than downcasting and asserting, I think this test could be
substantially more concise if it created the expected output (similarly to
above) in `assert_eq!(result_expr, expected_expr)`
As written, to really verify this I need to read the comments to find the
intent and then verify the 15 lines of assertion correctly implement that
assertion
##########
datafusion/expr-common/src/triviality.rs:
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Triviality classification for expressions and function arguments.
+
+/// Classification of argument triviality for scalar functions.
+///
+/// This enum is used by [`ScalarUDFImpl::triviality`] to allow
+/// functions to make context-dependent decisions about whether they are
+/// trivial based on the nature of their arguments.
+///
+/// For example, `get_field(struct_col, 'field_name')` is trivial (static field
+/// lookup), but `get_field(struct_col, key_col)` is not (dynamic per-row
lookup).
+///
+/// [`ScalarUDFImpl::triviality`]:
https://docs.rs/datafusion-expr/latest/datafusion_expr/trait.ScalarUDFImpl.html#tymethod.triviality
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ArgTriviality {
+ /// Argument is a literal constant value or an expression that can be
+ /// evaluated to a constant at planning time.
+ Literal,
Review Comment:
Given the code can simply look at the argument to determine if it is a
Literal or Column, it isn't clear to me why these need their own enum variants
here.
##########
datafusion/core/tests/physical_optimizer/projection_pushdown.rs:
##########
@@ -1723,3 +1729,87 @@ fn test_cooperative_exec_after_projection() ->
Result<()> {
Ok(())
}
+
+#[test]
+fn test_pushdown_projection_through_repartition_filter() {
+ let struct_fields = Fields::from(vec![Field::new("a", DataType::Int32,
false)]);
+ let array = StructArray::new(
+ struct_fields.clone(),
+ vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))],
+ None,
+ );
+ let batches = vec![
+ RecordBatch::try_new(
+ Arc::new(Schema::new(vec![Field::new(
+ "struct",
+ DataType::Struct(struct_fields.clone()),
+ true,
+ )])),
+ vec![Arc::new(array)],
+ )
+ .unwrap(),
+ ];
+ let build_side_schema = Arc::new(Schema::new(vec![Field::new(
+ "struct",
+ DataType::Struct(struct_fields),
+ true,
+ )]));
+
+ let scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+ .with_support(true)
+ .with_batches(batches)
+ .build();
+ let scan_schema = scan.schema();
+ let struct_access = get_field(datafusion_expr::col("struct"), "a");
+ let filter = struct_access.clone().gt(lit(2));
+ let repartition =
+ RepartitionExec::try_new(scan,
Partitioning::RoundRobinBatch(32)).unwrap();
+ let filter_exec = FilterExec::try_new(
+ logical2physical(&filter, &scan_schema),
+ Arc::new(repartition),
+ )
+ .unwrap();
+ let projection: Arc<dyn ExecutionPlan> = Arc::new(
+ ProjectionExec::try_new(
+ vec![ProjectionExpr::new(
+ logical2physical(&struct_access, &scan_schema),
+ "a",
+ )],
+ Arc::new(filter_exec),
+ )
+ .unwrap(),
+ ) as _;
+
+ let initial = displayable(projection.as_ref()).indent(true).to_string();
+ let actual = initial.trim();
+
+ assert_snapshot!(
+ actual,
+ @r"
+ ProjectionExec: expr=[get_field(struct@0, a) as a]
+ FilterExec: get_field(struct@0, a) > 2
+ RepartitionExec: partitioning=RoundRobinBatch(32), input_partitions=1
+ DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[struct], file_type=test, pushdown_supported=true
+ "
+ );
+
+ let after_optimize = ProjectionPushdown::new()
+ .optimize(projection, &ConfigOptions::new())
+ .unwrap();
+
+ let after_optimize_string = displayable(after_optimize.as_ref())
+ .indent(true)
+ .to_string();
+ let actual = after_optimize_string.trim();
+
+ // Projection should be pushed all the way down to the DataSource, and
+ // filter predicate should be rewritten to reference projection's output
column
+ assert_snapshot!(
Review Comment:
Not for this PR, but I noticed there is a lot of duplication of this "verify
and optimize" step with the other tests in this PR (though this new test is
consistent)
Maybe we can simplify them like
https://github.com/apache/datafusion/blob/e8196f462ff03e39a02fd94a5050311c3d03249a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs#L186-L208
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -2715,4 +2717,112 @@ mod tests {
Ok(())
}
+
+ /// Tests that TopK (SortExec with fetch) does not allow non-trivial
projections
+ /// to be pushed through it when they don't narrow the schema.
+ #[test]
+ fn test_topk_blocks_non_trivial_projection() -> Result<()> {
+ use crate::empty::EmptyExec;
+ use crate::projection::ProjectionExec;
+ use datafusion_expr::Operator;
+ use datafusion_physical_expr::expressions::BinaryExpr;
+ use datafusion_physical_expr::projection::ProjectionExpr;
+
+ // Create schema with two columns
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ ]));
+ let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
+
+ // Create SortExec with fetch (TopK)
+ let sort_expr =
PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
+ let sort_exec = SortExec::new([sort_expr].into(),
input).with_fetch(Some(10));
+
+ // Create a projection that:
+ // 1. Does NOT narrow the schema (same number of columns)
+ // 2. Has a non-trivial expression (literal + column)
+ let projection_exprs = vec![
+ ProjectionExpr::new(Arc::new(Column::new("a", 0)),
"a".to_string()),
+ ProjectionExpr::new(
Review Comment:
Also the same comment about building physical expressions more concisely
applies here
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -192,6 +193,73 @@ impl ProjectionExec {
input.boundedness(),
))
}
+
+ /// Returns true if the projection is beneficial to push through most
operators.
+ ///
+ /// Some cases to consider:
+ ///
+ /// - Sorts: sorts do expensive work (re-arranging rows) and thus benefit
from
+ /// having as little data underneath them as possible.
+ /// We don't want to push down expressions such as literals that could be
+ /// evaluated after the sort.
+ /// Truthfully we probably would want to push down some complex
expressions if
Review Comment:
👍
##########
datafusion/expr/src/expr.rs:
##########
@@ -1933,6 +1933,32 @@ impl Expr {
}
}
+ /// Returns the triviality classification of this expression.
+ ///
+ /// Trivial expressions include column references, literals, and nested
+ /// field access via `get_field`.
+ ///
+ /// # Example
+ /// ```
+ /// # use datafusion_expr::{col, ArgTriviality};
+ /// let expr = col("foo");
+ /// assert_eq!(expr.triviality(), ArgTriviality::Column);
+ /// ```
+ pub fn triviality(&self) -> ArgTriviality {
+ match self {
+ Expr::Column(_) => ArgTriviality::Column,
Review Comment:
If we are going to handle Column and Literal specially anyways, I don't
understand the value to adding `ArgTriviality::Column` and
`ArgTriviality::Literal` in the public API (I realize there may be uses for
such an enum in the implementations of certain pushdown rules)
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1391,8 +1391,10 @@ impl ExecutionPlan for SortExec {
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
- // If the projection does not narrow the schema, we should not try to
push it down.
- if projection.expr().len() >=
projection.input().schema().fields().len() {
+ // Only push projections that are trivial (column refs, field
accessors) or
Review Comment:
reading this makes sense, but I also wonder if (for a follow on PR) this
should take into account data types (like "narrowing" 2 int columns to a single
StringView column may not actually be faster)
Update: I see you also mention the same thing in the comments for
is_trivial_or_narrows_schema
##########
datafusion/physical-expr/src/scalar_function.rs:
##########
@@ -362,6 +362,14 @@ impl PhysicalExpr for ScalarFunctionExpr {
fn is_volatile_node(&self) -> bool {
self.fun.signature().volatility == Volatility::Volatile
}
+
+ fn triviality(&self) -> ArgTriviality {
+ // Classify each argument's triviality for context-aware decision
making
Review Comment:
not sure this comment is adding much value (it just repeats what the code
does, without any additional rationale or help for the reader)
##########
datafusion/physical-expr/src/projection.rs:
##########
@@ -2425,6 +2474,291 @@ pub(crate) mod tests {
Ok(())
}
+ #[test]
+ fn test_update_expr_matches_projected_expr() -> Result<()> {
+ // Test that when filter expression exactly matches a projected
expression,
+ // update_expr short-circuits and rewrites to use the projected column.
+ // e.g., projection: a * 2 AS a_times_2, filter: a * 2 > 4
+ // should become: a_times_2 > 4
+
+ // Create the computed expression: a@0 * 2
Review Comment:
It would help me a lot to review these tests if they were written using less
repetitive syntax -- for example
```rust
// Create the computed expression: a@0 * 2
let computed_expr = binary(col("a", 0), Operator::Multiply,
lit(2i32));
// Create projection with the computed expression aliased as
"a_times_2"
let projection =
vec![ProjectionExpr::new(Arc::clone(&computed_expr),
"a_times_2")];
```
I know it doesn't make any acutal difference to the generated code, but it
would make reviewing this much easier for me
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -2715,4 +2717,112 @@ mod tests {
Ok(())
}
+
+ /// Tests that TopK (SortExec with fetch) does not allow non-trivial
projections
+ /// to be pushed through it when they don't narrow the schema.
+ #[test]
+ fn test_topk_blocks_non_trivial_projection() -> Result<()> {
+ use crate::empty::EmptyExec;
+ use crate::projection::ProjectionExec;
+ use datafusion_expr::Operator;
+ use datafusion_physical_expr::expressions::BinaryExpr;
+ use datafusion_physical_expr::projection::ProjectionExpr;
+
+ // Create schema with two columns
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ ]));
+ let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
+
+ // Create SortExec with fetch (TopK)
+ let sort_expr =
PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
+ let sort_exec = SortExec::new([sort_expr].into(),
input).with_fetch(Some(10));
+
+ // Create a projection that:
+ // 1. Does NOT narrow the schema (same number of columns)
+ // 2. Has a non-trivial expression (literal + column)
+ let projection_exprs = vec![
+ ProjectionExpr::new(Arc::new(Column::new("a", 0)),
"a".to_string()),
Review Comment:
I think you can make this simpler like
There are a bunch more examples below too
```suggestion
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
```
##########
datafusion/physical-optimizer/src/projection_pushdown.rs:
##########
@@ -88,6 +98,267 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
}
}
+/// Tries to split a projection to extract beneficial sub-expressions for
pushdown.
+///
+/// This function walks each expression in the projection and extracts
beneficial
+/// sub-expressions (like `get_field`) from within larger non-beneficial
expressions.
+/// For example:
+/// - Input: `get_field(col, 'foo') + 1`
+/// - Output: Inner projection: `get_field(col, 'foo') AS __extracted_0`,
Outer: `__extracted_0 + 1`
+///
+/// This enables the beneficial parts to be pushed down while keeping
non-beneficial
+/// expressions (like literals and computations) above.
+fn try_split_projection(
+ plan: Arc<dyn ExecutionPlan>,
+ alias_generator: &AliasGenerator,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+ let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() else
{
+ return Ok(Transformed::no(plan));
+ };
+
+ let input_schema = projection.input().schema();
+ let mut extractor = TrivialExprExtractor::new(input_schema.as_ref(),
alias_generator);
+
+ // Extract trivial sub-expressions from each projection expression
+ let mut outer_exprs = Vec::new();
+ let mut has_extractions = false;
+
+ for proj_expr in projection.expr() {
+ // If this is already an expression from an extraction don't try to
re-extract it (would cause infinite recursion)
+ if proj_expr.alias.starts_with("__extracted") {
+ outer_exprs.push(proj_expr.clone());
+ continue;
+ }
+
+ // Only extract from non-trivial expressions. If the entire expression
is
+ // already TrivialExpr (like `get_field(col, 'foo')`), it can be
pushed as-is.
+ // We only need to split when there's a non-trivial expression with
trivial
+ // sub-expressions (like `get_field(col, 'foo') + 1`).
+ if matches!(proj_expr.expr.triviality(), ArgTriviality::TrivialExpr) {
+ outer_exprs.push(proj_expr.clone());
+ continue;
+ }
+
+ let rewritten = extractor.extract(Arc::clone(&proj_expr.expr))?;
+ if !Arc::ptr_eq(&rewritten, &proj_expr.expr) {
+ has_extractions = true;
+ }
+ outer_exprs.push(ProjectionExpr::new(rewritten,
proj_expr.alias.clone()));
+ }
+
+ if !has_extractions {
+ return Ok(Transformed::no(plan));
+ }
+
+ // Collect columns needed by outer expressions that aren't extracted
+ extractor.collect_columns_needed(&outer_exprs)?;
+
+ // Build inner projection from extracted expressions + needed columns
+ let inner_exprs = extractor.build_inner_projection()?;
+
+ if inner_exprs.is_empty() {
+ return Ok(Transformed::no(plan));
+ }
+
+ // Create the inner projection (to be pushed down)
+ let inner = ProjectionExec::try_new(inner_exprs,
Arc::clone(projection.input()))?;
+
+ // Rewrite outer expressions to reference the inner projection's output
schema
+ let inner_schema = inner.schema();
+ let final_outer_exprs = extractor.finalize_outer_exprs(outer_exprs,
&inner_schema)?;
+
+ // Create the outer projection (stays above)
+ let outer = ProjectionExec::try_new(final_outer_exprs, Arc::new(inner))?;
+
+ Ok(Transformed::yes(Arc::new(outer)))
+}
+
+/// Extracts beneficial trivial sub-expressions from larger expressions.
+///
+/// Similar to `JoinFilterRewriter`, this struct walks expression trees
top-down
+/// and extracts sub-expressions where `triviality() ==
ArgTriviality::TrivialExpr`
+/// (beneficial trivial expressions like field accessors).
+///
+/// The extracted expressions are replaced with column references pointing to
+/// an inner projection that computes these sub-expressions.
+struct TrivialExprExtractor<'a> {
+ /// Extracted trivial expressions: maps expression -> alias
+ extracted: IndexMap<Arc<dyn PhysicalExpr>, String>,
+ /// Columns needed by outer expressions: maps input column index -> alias
+ columns_needed: IndexMap<usize, String>,
+ /// Input schema for the projection
+ input_schema: &'a Schema,
+ /// Alias generator for unique names
+ alias_generator: &'a AliasGenerator,
+}
+
+impl<'a> TrivialExprExtractor<'a> {
+ fn new(input_schema: &'a Schema, alias_generator: &'a AliasGenerator) ->
Self {
+ Self {
+ extracted: IndexMap::new(),
+ columns_needed: IndexMap::new(),
+ input_schema,
+ alias_generator,
+ }
+ }
+
+ /// Extracts beneficial trivial sub-expressions from the given expression.
+ ///
+ /// Walks the expression tree top-down and replaces beneficial trivial
+ /// sub-expressions with column references to the inner projection.
+ fn extract(&mut self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn
PhysicalExpr>> {
+ // Top-down: check self first, then recurse to children
+ if matches!(expr.triviality(), ArgTriviality::TrivialExpr) {
+ // Extract this entire sub-tree
+ return Ok(self.add_extracted_expr(expr));
+ }
+
+ // Not extractable at this level - recurse into children
+ let children = expr.children();
+ if children.is_empty() {
+ return Ok(expr);
+ }
+
+ let mut new_children = Vec::with_capacity(children.len());
+ let mut any_changed = false;
+
+ for child in children {
+ let new_child = self.extract(Arc::clone(child))?;
+ if !Arc::ptr_eq(&new_child, child) {
Review Comment:
why check the pointer? Given this struct is doing the rewrite you could just
set the flag in the clause above ("extract the entire subtree")
##########
datafusion/sqllogictest/test_files/projection_pushdown.slt:
##########
@@ -0,0 +1,1001 @@
+# Licensed to the Apache Software Foundation (ASF) under one
Review Comment:
this file seems to duplicate
datafusion/sqllogictest/test_files/parquet_pushdown.slt right? They both test
pushdown in parquet with only a single target_partition... Why have both?
I had started to comment about multi partition test coverage in
parquet_pushdown but then I saw this file
##########
datafusion/sqllogictest/test_files/projection_pushdown.slt:
##########
@@ -0,0 +1,1001 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for projection pushdown behavior with get_field expressions
+#
+# This file tests the ExtractTrivialProjections optimizer rule and
+# physical projection pushdown for:
+# - get_field expressions (struct field access like s['foo'])
+# - Pushdown through Filter, Sort, and TopK operators
+# - Multi-partition scenarios with SortPreservingMergeExec
+##########
+
+#####################
+# Section 1: Setup - Single Partition Tests
+#####################
+
+# Set target_partitions = 1 for deterministic plan output
+statement ok
+SET datafusion.execution.target_partitions = 1;
+
+# Create parquet file with struct column containing value and label fields
+statement ok
+COPY (
+ SELECT
+ column1 as id,
+ column2 as s
+ FROM VALUES
+ (1, {value: 100, label: 'alpha'}),
+ (2, {value: 200, label: 'beta'}),
+ (3, {value: 150, label: 'gamma'}),
+ (4, {value: 300, label: 'delta'}),
+ (5, {value: 250, label: 'epsilon'})
+) TO 'test_files/scratch/projection_pushdown/simple.parquet'
+STORED AS PARQUET;
+
+# Create table for simple struct tests
+statement ok
+CREATE EXTERNAL TABLE simple_struct STORED AS PARQUET
+LOCATION 'test_files/scratch/projection_pushdown/simple.parquet';
+
+# Create parquet file with nested struct column
+statement ok
+COPY (
+ SELECT
+ column1 as id,
+ column2 as nested
+ FROM VALUES
+ (1, {outer: {inner: 10, name: 'one'}, extra: 'x'}),
+ (2, {outer: {inner: 20, name: 'two'}, extra: 'y'}),
+ (3, {outer: {inner: 30, name: 'three'}, extra: 'z'})
+) TO 'test_files/scratch/projection_pushdown/nested.parquet'
+STORED AS PARQUET;
+
+# Create table for nested struct tests
+statement ok
+CREATE EXTERNAL TABLE nested_struct STORED AS PARQUET
+LOCATION 'test_files/scratch/projection_pushdown/nested.parquet';
+
+# Create parquet file with nullable struct column
+statement ok
+COPY (
+ SELECT
+ column1 as id,
+ column2 as s
+ FROM VALUES
+ (1, {value: 100, label: 'alpha'}),
+ (2, NULL),
+ (3, {value: 150, label: 'gamma'}),
+ (4, NULL),
+ (5, {value: 250, label: 'epsilon'})
+) TO 'test_files/scratch/projection_pushdown/nullable.parquet'
+STORED AS PARQUET;
+
+# Create table for nullable struct tests
+statement ok
+CREATE EXTERNAL TABLE nullable_struct STORED AS PARQUET
+LOCATION 'test_files/scratch/projection_pushdown/nullable.parquet';
+
+
+#####################
+# Section 2: Basic get_field Pushdown (Projection above scan)
+#####################
+
+###
+# Test 2.1: Simple s['value'] - pushed into DataSourceExec
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+02)--TableScan: simple_struct projection=[id, s]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct ORDER BY id;
+----
+1 100
+2 200
+3 150
+4 300
+5 250
+
+###
+# Test 2.2: Multiple get_field expressions - all pushed
+###
+
+query TT
+EXPLAIN SELECT id, s['value'], s['label'] FROM simple_struct;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")),
get_field(simple_struct.s, Utf8("label"))
+02)--TableScan: simple_struct projection=[id, s]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value],
get_field(s@1, label) as simple_struct.s[label]], file_type=parquet
+
+# Verify correctness
+query IIT
+SELECT id, s['value'], s['label'] FROM simple_struct ORDER BY id;
+----
+1 100 alpha
+2 200 beta
+3 150 gamma
+4 300 delta
+5 250 epsilon
+
+###
+# Test 2.3: Nested s['outer']['inner'] - pushed
+###
+
+query TT
+EXPLAIN SELECT id, nested['outer']['inner'] FROM nested_struct;
+----
+logical_plan
+01)Projection: nested_struct.id, get_field(nested_struct.nested,
Utf8("outer"), Utf8("inner"))
+02)--TableScan: nested_struct projection=[id, nested]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/nested.parquet]]},
projection=[id, get_field(nested@1, outer, inner) as
nested_struct.nested[outer][inner]], file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, nested['outer']['inner'] FROM nested_struct ORDER BY id;
+----
+1 10
+2 20
+3 30
+
+###
+# Test 2.4: s['value'] + 1 - entire expression pushed (directly above scan)
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] + 1 FROM simple_struct;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1)
+02)--TableScan: simple_struct projection=[id, s]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) + 1 as simple_struct.s[value] +
Int64(1)], file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, s['value'] + 1 FROM simple_struct ORDER BY id;
+----
+1 101
+2 201
+3 151
+4 301
+5 251
+
+###
+# Test 2.5: s['label'] || '_suffix' - pushed (directly above scan)
+###
+
+query TT
+EXPLAIN SELECT id, s['label'] || '_suffix' FROM simple_struct;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("label")) ||
Utf8("_suffix")
+02)--TableScan: simple_struct projection=[id, s]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, label) || _suffix as simple_struct.s[label] ||
Utf8("_suffix")], file_type=parquet
+
+# Verify correctness
+query IT
+SELECT id, s['label'] || '_suffix' FROM simple_struct ORDER BY id;
+----
+1 alpha_suffix
+2 beta_suffix
+3 gamma_suffix
+4 delta_suffix
+5 epsilon_suffix
+
+
+#####################
+# Section 3: Projection Through Filter
+#####################
+
+###
+# Test 3.1: Simple get_field through Filter - pushed
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct WHERE id > 2;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+02)--Filter: simple_struct.id > Int64(2)
+03)----TableScan: simple_struct projection=[id, s],
partial_filters=[simple_struct.id > Int64(2)]
+physical_plan
+01)FilterExec: id@0 > 2
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet, predicate=id@0 > 2, pruning_predicate=id_null_count@1 !=
row_count@2 AND id_max@0 > 2, required_guarantees=[]
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct WHERE id > 2 ORDER BY id;
+----
+3 150
+4 300
+5 250
+
+###
+# Test 3.2: s['value'] + 1 through Filter - get_field extracted and pushed
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] + 1 FROM simple_struct WHERE id > 2;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1)
+02)--Filter: simple_struct.id > Int64(2)
+03)----TableScan: simple_struct projection=[id, s],
partial_filters=[simple_struct.id > Int64(2)]
+physical_plan
+01)ProjectionExec: expr=[id@1 as id, __extracted_1@0 + 1 as
simple_struct.s[value] + Int64(1)]
+02)--FilterExec: id@1 > 2
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __extracted_1, id], file_type=parquet,
predicate=id@0 > 2, pruning_predicate=id_null_count@1 != row_count@2 AND
id_max@0 > 2, required_guarantees=[]
+
+# Verify correctness
+query II
+SELECT id, s['value'] + 1 FROM simple_struct WHERE id > 2 ORDER BY id;
+----
+3 151
+4 301
+5 251
+
+###
+# Test 3.3: Filter on get_field expression
+###
+
+query TT
+EXPLAIN SELECT id, s['label'] FROM simple_struct WHERE s['value'] > 150;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("label"))
+02)--Filter: get_field(simple_struct.s, Utf8("value")) > Int64(150)
+03)----TableScan: simple_struct projection=[id, s],
partial_filters=[get_field(simple_struct.s, Utf8("value")) > Int64(150)]
+physical_plan
+01)ProjectionExec: expr=[id@0 as id, get_field(s@1, label) as
simple_struct.s[label]]
+02)--FilterExec: get_field(s@1, value) > 150
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, s], file_type=parquet
+
+# Verify correctness
+query IT
+SELECT id, s['label'] FROM simple_struct WHERE s['value'] > 150 ORDER BY id;
+----
+2 beta
+4 delta
+5 epsilon
+
+
+#####################
+# Section 4: Projection Through Sort (no LIMIT)
+#####################
+
+###
+# Test 4.1: Simple get_field through Sort - pushed
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct ORDER BY id;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct ORDER BY id;
+----
+1 100
+2 200
+3 150
+4 300
+5 250
+
+###
+# Test 4.2: s['value'] + 1 through Sort - split projection
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] + 1 FROM simple_struct ORDER BY id;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1)
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) + 1 as simple_struct.s[value] +
Int64(1)], file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, s['value'] + 1 FROM simple_struct ORDER BY id;
+----
+1 101
+2 201
+3 151
+4 301
+5 251
+
+###
+# Test 4.3: Sort by get_field expression
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct ORDER BY s['value'];
+----
+logical_plan
+01)Sort: simple_struct.s[value] ASC NULLS LAST
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: expr=[simple_struct.s[value]@1 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct ORDER BY s['value'];
+----
+1 100
+3 150
+2 200
+5 250
+4 300
+
+
+#####################
+# Section 5: Projection Through TopK (ORDER BY + LIMIT)
+#####################
+
+###
+# Test 5.1: Simple get_field through TopK - pushed (trivial)
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct ORDER BY id LIMIT 3;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST, fetch=3
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct ORDER BY id LIMIT 3;
+----
+1 100
+2 200
+3 150
+
+###
+# Test 5.2: s['value'] + 1 through TopK - pushed (narrows schema from 2 to 2
cols)
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] + 1 FROM simple_struct ORDER BY id LIMIT 3;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST, fetch=3
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1)
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) + 1 as simple_struct.s[value] +
Int64(1)], file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# Verify correctness
+query II
+SELECT id, s['value'] + 1 FROM simple_struct ORDER BY id LIMIT 3;
+----
+1 101
+2 201
+3 151
+
+###
+# Test 5.3: Multiple get_field through TopK - all pushed
+###
+
+query TT
+EXPLAIN SELECT id, s['value'], s['label'] FROM simple_struct ORDER BY id LIMIT
3;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST, fetch=3
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")),
get_field(simple_struct.s, Utf8("label"))
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value],
get_field(s@1, label) as simple_struct.s[label]], file_type=parquet,
predicate=DynamicFilter [ empty ]
+
+# Verify correctness
+query IIT
+SELECT id, s['value'], s['label'] FROM simple_struct ORDER BY id LIMIT 3;
+----
+1 100 alpha
+2 200 beta
+3 150 gamma
+
+###
+# Test 5.4: Nested get_field through TopK - pushed
+###
+
+query TT
+EXPLAIN SELECT id, nested['outer']['inner'] FROM nested_struct ORDER BY id
LIMIT 2;
+----
+logical_plan
+01)Sort: nested_struct.id ASC NULLS LAST, fetch=2
+02)--Projection: nested_struct.id, get_field(nested_struct.nested,
Utf8("outer"), Utf8("inner"))
+03)----TableScan: nested_struct projection=[id, nested]
+physical_plan
+01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/nested.parquet]]},
projection=[id, get_field(nested@1, outer, inner) as
nested_struct.nested[outer][inner]], file_type=parquet, predicate=DynamicFilter
[ empty ]
+
+# Verify correctness
+query II
+SELECT id, nested['outer']['inner'] FROM nested_struct ORDER BY id LIMIT 2;
+----
+1 10
+2 20
+
+###
+# Test 5.5: String concat through TopK - pushed (narrows schema)
+###
+
+query TT
+EXPLAIN SELECT id, s['label'] || '_suffix' FROM simple_struct ORDER BY id
LIMIT 3;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST, fetch=3
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("label"))
|| Utf8("_suffix")
+03)----TableScan: simple_struct projection=[id, s]
+physical_plan
+01)SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, label) || _suffix as simple_struct.s[label] ||
Utf8("_suffix")], file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# Verify correctness
+query IT
+SELECT id, s['label'] || '_suffix' FROM simple_struct ORDER BY id LIMIT 3;
+----
+1 alpha_suffix
+2 beta_suffix
+3 gamma_suffix
+
+
+#####################
+# Section 6: Combined Operators
+#####################
+
+###
+# Test 6.1: Filter + Sort + get_field
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct WHERE id > 1 ORDER BY
s['value'];
+----
+logical_plan
+01)Sort: simple_struct.s[value] ASC NULLS LAST
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+03)----Filter: simple_struct.id > Int64(1)
+04)------TableScan: simple_struct projection=[id, s],
partial_filters=[simple_struct.id > Int64(1)]
+physical_plan
+01)SortExec: expr=[simple_struct.s[value]@1 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--FilterExec: id@0 > 1
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet, predicate=id@0 > 1, pruning_predicate=id_null_count@1 !=
row_count@2 AND id_max@0 > 1, required_guarantees=[]
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct WHERE id > 1 ORDER BY s['value'];
+----
+3 150
+2 200
+5 250
+4 300
+
+###
+# Test 6.2: Filter + TopK + get_field
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct WHERE id > 1 ORDER BY
s['value'] LIMIT 2;
+----
+logical_plan
+01)Sort: simple_struct.s[value] ASC NULLS LAST, fetch=2
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+03)----Filter: simple_struct.id > Int64(1)
+04)------TableScan: simple_struct projection=[id, s],
partial_filters=[simple_struct.id > Int64(1)]
+physical_plan
+01)SortExec: TopK(fetch=2), expr=[simple_struct.s[value]@1 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--FilterExec: id@0 > 1
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet, predicate=id@0 > 1, pruning_predicate=id_null_count@1 !=
row_count@2 AND id_max@0 > 1, required_guarantees=[]
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct WHERE id > 1 ORDER BY s['value']
LIMIT 2;
+----
+3 150
+2 200
+
+###
+# Test 6.3: Filter + TopK + get_field with arithmetic
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] + 1 FROM simple_struct WHERE id > 1 ORDER BY id
LIMIT 2;
+----
+logical_plan
+01)Sort: simple_struct.id ASC NULLS LAST, fetch=2
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1)
+03)----Filter: simple_struct.id > Int64(1)
+04)------TableScan: simple_struct projection=[id, s],
partial_filters=[simple_struct.id > Int64(1)]
+physical_plan
+01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
+02)--ProjectionExec: expr=[id@1 as id, __extracted_1@0 + 1 as
simple_struct.s[value] + Int64(1)]
+03)----FilterExec: id@1 > 1
+04)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __extracted_1, id], file_type=parquet,
predicate=id@0 > 1 AND DynamicFilter [ empty ],
pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1,
required_guarantees=[]
+
+# Verify correctness
+query II
+SELECT id, s['value'] + 1 FROM simple_struct WHERE id > 1 ORDER BY id LIMIT 2;
+----
+2 201
+3 151
+
+
+#####################
+# Section 7: Multi-Partition Tests
+#####################
+
+# Set target_partitions = 4 for parallel execution
+statement ok
+SET datafusion.execution.target_partitions = 4;
+
+# Create 5 parquet files (more than partitions) for parallel tests
+statement ok
+COPY (SELECT 1 as id, {value: 100, label: 'alpha'} as s)
+TO 'test_files/scratch/projection_pushdown/multi/part1.parquet'
+STORED AS PARQUET;
+
+statement ok
+COPY (SELECT 2 as id, {value: 200, label: 'beta'} as s)
+TO 'test_files/scratch/projection_pushdown/multi/part2.parquet'
+STORED AS PARQUET;
+
+statement ok
+COPY (SELECT 3 as id, {value: 150, label: 'gamma'} as s)
+TO 'test_files/scratch/projection_pushdown/multi/part3.parquet'
+STORED AS PARQUET;
+
+statement ok
+COPY (SELECT 4 as id, {value: 300, label: 'delta'} as s)
+TO 'test_files/scratch/projection_pushdown/multi/part4.parquet'
+STORED AS PARQUET;
+
+statement ok
+COPY (SELECT 5 as id, {value: 250, label: 'epsilon'} as s)
+TO 'test_files/scratch/projection_pushdown/multi/part5.parquet'
+STORED AS PARQUET;
+
+# Create table from multiple parquet files
+statement ok
+CREATE EXTERNAL TABLE multi_struct STORED AS PARQUET
+LOCATION 'test_files/scratch/projection_pushdown/multi/';
+
+###
+# Test 7.1: Multi-partition Sort with get_field
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM multi_struct ORDER BY id;
+----
+logical_plan
+01)Sort: multi_struct.id ASC NULLS LAST
+02)--Projection: multi_struct.id, get_field(multi_struct.s, Utf8("value"))
+03)----TableScan: multi_struct projection=[id, s]
+physical_plan
+01)SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+02)--SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
+03)----DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]},
projection=[id, get_field(s@1, value) as multi_struct.s[value]],
file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM multi_struct ORDER BY id;
+----
+1 100
+2 200
+3 150
+4 300
+5 250
+
+###
+# Test 7.2: Multi-partition TopK with get_field
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM multi_struct ORDER BY id LIMIT 3;
+----
+logical_plan
+01)Sort: multi_struct.id ASC NULLS LAST, fetch=3
+02)--Projection: multi_struct.id, get_field(multi_struct.s, Utf8("value"))
+03)----TableScan: multi_struct projection=[id, s]
+physical_plan
+01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3
+02)--SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[true]
+03)----DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]},
projection=[id, get_field(s@1, value) as multi_struct.s[value]],
file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM multi_struct ORDER BY id LIMIT 3;
+----
+1 100
+2 200
+3 150
+
+###
+# Test 7.3: Multi-partition TopK with arithmetic (non-trivial stays above
merge)
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] + 1 FROM multi_struct ORDER BY id LIMIT 3;
+----
+logical_plan
+01)Sort: multi_struct.id ASC NULLS LAST, fetch=3
+02)--Projection: multi_struct.id, get_field(multi_struct.s, Utf8("value")) +
Int64(1)
+03)----TableScan: multi_struct projection=[id, s]
+physical_plan
+01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3
+02)--SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[true]
+03)----DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]},
projection=[id, get_field(s@1, value) + 1 as multi_struct.s[value] +
Int64(1)], file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# Verify correctness
+query II
+SELECT id, s['value'] + 1 FROM multi_struct ORDER BY id LIMIT 3;
+----
+1 101
+2 201
+3 151
+
+###
+# Test 7.4: Multi-partition Filter with get_field
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM multi_struct WHERE id > 2 ORDER BY id;
+----
+logical_plan
+01)Sort: multi_struct.id ASC NULLS LAST
+02)--Projection: multi_struct.id, get_field(multi_struct.s, Utf8("value"))
+03)----Filter: multi_struct.id > Int64(2)
+04)------TableScan: multi_struct projection=[id, s],
partial_filters=[multi_struct.id > Int64(2)]
+physical_plan
+01)SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+02)--SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
+03)----FilterExec: id@0 > 2
+04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
+05)--------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]},
projection=[id, get_field(s@1, value) as multi_struct.s[value]],
file_type=parquet, predicate=id@0 > 2, pruning_predicate=id_null_count@1 !=
row_count@2 AND id_max@0 > 2, required_guarantees=[]
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM multi_struct WHERE id > 2 ORDER BY id;
+----
+3 150
+4 300
+5 250
+
+###
+# Test 7.5: Aggregation with get_field (CoalescePartitions)
+###
+
+query TT
+EXPLAIN SELECT s['label'], SUM(s['value']) FROM multi_struct GROUP BY
s['label'];
+----
+logical_plan
+01)Aggregate: groupBy=[[get_field(multi_struct.s, Utf8("label"))]],
aggr=[[sum(get_field(multi_struct.s, Utf8("value")))]]
+02)--TableScan: multi_struct projection=[s]
+physical_plan
+01)AggregateExec: mode=FinalPartitioned, gby=[multi_struct.s[label]@0 as
multi_struct.s[label]], aggr=[sum(multi_struct.s[value])]
+02)--RepartitionExec: partitioning=Hash([multi_struct.s[label]@0], 4),
input_partitions=3
+03)----AggregateExec: mode=Partial, gby=[get_field(s@0, label) as
multi_struct.s[label]], aggr=[sum(multi_struct.s[value])]
+04)------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]},
projection=[s], file_type=parquet
Review Comment:
why are `s['label']` and `s['value']` NOT pushed into the scan here? it
seems like they could be?
##########
datafusion/sqllogictest/test_files/projection_pushdown.slt:
##########
@@ -0,0 +1,1001 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for projection pushdown behavior with get_field expressions
+#
+# This file tests the ExtractTrivialProjections optimizer rule and
+# physical projection pushdown for:
+# - get_field expressions (struct field access like s['foo'])
+# - Pushdown through Filter, Sort, and TopK operators
+# - Multi-partition scenarios with SortPreservingMergeExec
+##########
+
+#####################
+# Section 1: Setup - Single Partition Tests
+#####################
+
+# Set target_partitions = 1 for deterministic plan output
+statement ok
+SET datafusion.execution.target_partitions = 1;
+
+# Create parquet file with struct column containing value and label fields
+statement ok
+COPY (
+ SELECT
+ column1 as id,
+ column2 as s
+ FROM VALUES
+ (1, {value: 100, label: 'alpha'}),
+ (2, {value: 200, label: 'beta'}),
+ (3, {value: 150, label: 'gamma'}),
+ (4, {value: 300, label: 'delta'}),
+ (5, {value: 250, label: 'epsilon'})
+) TO 'test_files/scratch/projection_pushdown/simple.parquet'
+STORED AS PARQUET;
+
+# Create table for simple struct tests
+statement ok
+CREATE EXTERNAL TABLE simple_struct STORED AS PARQUET
+LOCATION 'test_files/scratch/projection_pushdown/simple.parquet';
+
+# Create parquet file with nested struct column
+statement ok
+COPY (
+ SELECT
+ column1 as id,
+ column2 as nested
+ FROM VALUES
+ (1, {outer: {inner: 10, name: 'one'}, extra: 'x'}),
+ (2, {outer: {inner: 20, name: 'two'}, extra: 'y'}),
+ (3, {outer: {inner: 30, name: 'three'}, extra: 'z'})
+) TO 'test_files/scratch/projection_pushdown/nested.parquet'
+STORED AS PARQUET;
+
+# Create table for nested struct tests
+statement ok
+CREATE EXTERNAL TABLE nested_struct STORED AS PARQUET
+LOCATION 'test_files/scratch/projection_pushdown/nested.parquet';
+
+# Create parquet file with nullable struct column
+statement ok
+COPY (
+ SELECT
+ column1 as id,
+ column2 as s
+ FROM VALUES
+ (1, {value: 100, label: 'alpha'}),
+ (2, NULL),
+ (3, {value: 150, label: 'gamma'}),
+ (4, NULL),
+ (5, {value: 250, label: 'epsilon'})
+) TO 'test_files/scratch/projection_pushdown/nullable.parquet'
+STORED AS PARQUET;
+
+# Create table for nullable struct tests
+statement ok
+CREATE EXTERNAL TABLE nullable_struct STORED AS PARQUET
+LOCATION 'test_files/scratch/projection_pushdown/nullable.parquet';
+
+
+#####################
+# Section 2: Basic get_field Pushdown (Projection above scan)
+#####################
+
+###
+# Test 2.1: Simple s['value'] - pushed into DataSourceExec
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] FROM simple_struct;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+02)--TableScan: simple_struct projection=[id, s]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value]],
file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, s['value'] FROM simple_struct ORDER BY id;
+----
+1 100
+2 200
+3 150
+4 300
+5 250
+
+###
+# Test 2.2: Multiple get_field expressions - all pushed
+###
+
+query TT
+EXPLAIN SELECT id, s['value'], s['label'] FROM simple_struct;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")),
get_field(simple_struct.s, Utf8("label"))
+02)--TableScan: simple_struct projection=[id, s]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as simple_struct.s[value],
get_field(s@1, label) as simple_struct.s[label]], file_type=parquet
+
+# Verify correctness
+query IIT
+SELECT id, s['value'], s['label'] FROM simple_struct ORDER BY id;
+----
+1 100 alpha
+2 200 beta
+3 150 gamma
+4 300 delta
+5 250 epsilon
+
+###
+# Test 2.3: Nested s['outer']['inner'] - pushed
+###
+
+query TT
+EXPLAIN SELECT id, nested['outer']['inner'] FROM nested_struct;
+----
+logical_plan
+01)Projection: nested_struct.id, get_field(nested_struct.nested,
Utf8("outer"), Utf8("inner"))
+02)--TableScan: nested_struct projection=[id, nested]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/nested.parquet]]},
projection=[id, get_field(nested@1, outer, inner) as
nested_struct.nested[outer][inner]], file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, nested['outer']['inner'] FROM nested_struct ORDER BY id;
+----
+1 10
+2 20
+3 30
+
+###
+# Test 2.4: s['value'] + 1 - entire expression pushed (directly above scan)
+###
+
+query TT
+EXPLAIN SELECT id, s['value'] + 1 FROM simple_struct;
+----
+logical_plan
+01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1)
+02)--TableScan: simple_struct projection=[id, s]
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) + 1 as simple_struct.s[value] +
Int64(1)], file_type=parquet
+
+# Verify correctness
+query II
+SELECT id, s['value'] + 1 FROM simple_struct ORDER BY id;
+----
+1 101
+2 201
+3 151
+4 301
+5 251
+
+###
+# Test 2.5: s['label'] || '_suffix' - pushed (directly above scan)
Review Comment:
it looks to me like the optimizer has pushed down both the field access and
the `||` operator
I would have expected that it only pushed the get_field down 🤔
##########
datafusion/expr-common/src/triviality.rs:
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Triviality classification for expressions and function arguments.
+
+/// Classification of argument triviality for scalar functions.
+///
+/// This enum is used by [`ScalarUDFImpl::triviality`] to allow
+/// functions to make context-dependent decisions about whether they are
+/// trivial based on the nature of their arguments.
+///
+/// For example, `get_field(struct_col, 'field_name')` is trivial (static field
+/// lookup), but `get_field(struct_col, key_col)` is not (dynamic per-row
lookup).
+///
+/// [`ScalarUDFImpl::triviality`]:
https://docs.rs/datafusion-expr/latest/datafusion_expr/trait.ScalarUDFImpl.html#tymethod.triviality
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ArgTriviality {
+ /// Argument is a literal constant value or an expression that can be
+ /// evaluated to a constant at planning time.
+ Literal,
+ /// Argument is a simple column reference.
+ Column,
+ /// Argument is a complex expression that declares itself trivial.
+ /// For example, if `get_field(struct_col, 'field_name')` is implemented
as a
+ /// trivial expression, then it would return this variant.
+ /// Then `other_trivial_function(get_field(...), 42)` could also be
classified as
+ /// a trivial expression using the knowledge that `get_field(...)` is
trivial.
+ TrivialExpr,
+ /// Argument is a complex expression that declares itself non-trivial.
+ /// For example, `min(col1 + col2)` is non-trivial because it requires
per-row computation.
+ NonTrivial,
+}
Review Comment:
I think we should lean into the properties / what will be done with this
classification
Maybe we could highlight that the point of this pushdown is to push
expressions that can be efficiently evaluated *in* table scans? If so, perhaps
something like "PushToScan" could make the usecase the most clear?
If we are only planning to push down column extraction, we could use a term
like "FieldAccess" or "SubfieldAccess" (though what about Casts?)
##########
datafusion/physical-optimizer/src/output_requirements.rs:
##########
@@ -256,18 +256,13 @@ impl ExecutionPlan for OutputRequirementExec {
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
- // If the projection does not narrow the schema, we should not try to
push it down:
- let proj_exprs = projection.expr();
- if proj_exprs.len() >= projection.input().schema().fields().len() {
- return Ok(None);
- }
Review Comment:
> It seems to be that OutputRequirementExec is a temporary marker used by
the physical optimizer and should not have any bearing in where projections are
place
Yes, I think that is correct. I think OutputRequirementExec is always
supposed to be the root node of a query plan and is used to communicate what
the output requirements are (for example for INSERT AS SELECT(...))` the
desired distribution for insert is communicated by the OutputRequirementExec
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]