This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c931c89886 ensure dynamic filters are correctly pushed down through 
aggregations (#21059)
c931c89886 is described below

commit c931c89886997bf486b148c19f33e3e33a933349
Author: Jayant Shrivastava <[email protected]>
AuthorDate: Thu Apr 2 09:52:46 2026 -0400

    ensure dynamic filters are correctly pushed down through aggregations 
(#21059)
    
    ## Which issue does this PR close?
    
    - Closes https://github.com/apache/datafusion/issues/21065.
    
    ## Rationale for this change
    
    In plans such as the following, dynamic filters are not pushed down
    through the aggregation
    ```
    CREATE TABLE data (a VARCHAR, ts TIMESTAMP, value DOUBLE)
        AS VALUES
          ('h1', '2024-01-01T00:05:00', 1.0),
          ('h1', '2024-01-01T00:15:00', 2.0),
          ('h2', '2024-01-01T00:25:00', 3.0),
          ('h3', '2024-01-01T00:35:00', 4.0);
    
    SELECT * FROM contexts c
      INNER JOIN (
        SELECT a, date_bin(interval '1 hour', ts) AS bucket, min(value) AS 
min_val
        FROM (SELECT value, a, ts FROM data)
        GROUP BY a, date_bin(interval '1 hour', ts)
      ) agg ON c.a = agg.a;
    ```
    
    ```
        HashJoinExec: mode=Auto, join_type=Inner, on=[(a@0, a@0)]
          DataSourceExec: partitions=1
          ProjectionExec: [a@0, date_bin(1h, ts)@1 as bucket, min(value)@2 as 
min_val]
            AggregateExec: mode=FinalPartitioned, gby=[a@0, date_bin(1h, 
ts)@1], aggr=[min(value)]
              AggregateExec: mode=Partial, gby=[a@1, date_bin(1h, ts@2)], 
aggr=[min(value)]
                ProjectionExec: [value@2, a@0, ts@1]        ← reorders columns
                  DataSourceExec: partitions=1
    ```
    
    ## What changes are included in this PR?
    
    `AggregateExec::gather_filters_for_pushdown` compared parent filter
    columns (output schema indices) against grouping expression columns
    (input schema indices). When a `ProjectionExec` below the aggregate
    reorders columns, the index mismatch causes filters (such as HashJoin
    dynamic filters) to be incorrectly blocked.
    
    This change fixes the column index mapping in
    `AggregateExec::gather_filters_for_pushdown`
    
    ## Are these changes tested?
    
    - `test_pushdown_through_aggregate_with_reordered_input_columns` —
    filter on grouping column with reordered input is pushed down
    -
    
`test_pushdown_through_aggregate_with_reordered_input_no_pushdown_on_agg_result`
    — filter on aggregate result column is not pushed down
    - `test_pushdown_through_aggregate_grouping_sets_with_reordered_input` —
    GROUPING SETS: filter on common column pushed, filter on missing column
    blocked
    -
    
`test_hashjoin_dynamic_filter_pushdown_through_aggregate_with_reordered_input`
    — HashJoin dynamic filter pushes through aggregate with reordered input
    and is populated with values after
       execution
      - All tests verified to fail without the fix
    
    ## Are there any user-facing changes?
    
    No.
---
 .../tests/physical_optimizer/filter_pushdown.rs    | 342 +++++++++++++++++++++
 datafusion/physical-plan/src/aggregates/mod.rs     |  23 +-
 .../test_files/push_down_filter_regression.slt     |  39 +++
 3 files changed, 395 insertions(+), 9 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
index 9f3dffd230..6fe77c5e89 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
@@ -34,6 +34,7 @@ use datafusion::{
     scalar::ScalarValue,
 };
 use datafusion_catalog::memory::DataSourceExec;
+use datafusion_common::JoinType;
 use datafusion_common::config::ConfigOptions;
 use datafusion_datasource::{
     PartitionedFile, file_groups::FileGroup, 
file_scan_config::FileScanConfigBuilder,
@@ -53,6 +54,7 @@ use datafusion_physical_expr::{
 use datafusion_physical_optimizer::{
     PhysicalOptimizerRule, filter_pushdown::FilterPushdown,
 };
+use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
 use datafusion_physical_plan::{
     ExecutionPlan,
     aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
@@ -3489,6 +3491,346 @@ fn test_pushdown_with_empty_group_by() {
     );
 }
 
+#[test]
+fn test_pushdown_through_aggregate_with_reordered_input_columns() {
+    let scan = TestScanBuilder::new(schema()).with_support(true).build();
+
+    // Reorder scan output from (a, b, c) to (c, a, b)
+    let reordered_schema = Arc::new(Schema::new(vec![
+        Field::new("c", DataType::Float64, false),
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Utf8, false),
+    ]));
+    let projection = Arc::new(
+        ProjectionExec::try_new(
+            vec![
+                (col("c", &schema()).unwrap(), "c".to_string()),
+                (col("a", &schema()).unwrap(), "a".to_string()),
+                (col("b", &schema()).unwrap(), "b".to_string()),
+            ],
+            scan,
+        )
+        .unwrap(),
+    );
+
+    let aggregate_expr = vec![
+        AggregateExprBuilder::new(
+            count_udaf(),
+            vec![col("c", &reordered_schema).unwrap()],
+        )
+        .schema(reordered_schema.clone())
+        .alias("cnt")
+        .build()
+        .map(Arc::new)
+        .unwrap(),
+    ];
+
+    // Group by a@1, b@2 (input indices in reordered schema)
+    let group_by = PhysicalGroupBy::new_single(vec![
+        (col("a", &reordered_schema).unwrap(), "a".to_string()),
+        (col("b", &reordered_schema).unwrap(), "b".to_string()),
+    ]);
+
+    let aggregate = Arc::new(
+        AggregateExec::try_new(
+            AggregateMode::Final,
+            group_by,
+            aggregate_expr,
+            vec![None],
+            projection,
+            reordered_schema,
+        )
+        .unwrap(),
+    );
+
+    // Filter on b@1 in aggregate's output schema (a@0, b@1, cnt@2)
+    // The grouping expr for b references input index 2, but output index is 1.
+    let agg_output_schema = aggregate.schema();
+    let predicate = col_lit_predicate("b", "bar", &agg_output_schema);
+    let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
+
+    // The filter should be pushed down
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, FilterPushdown::new(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: b@1 = bar
+        -   AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt]
+        -     ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+        -       DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt], 
ordering_mode=PartiallySorted([1])
+          -   ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+          -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = 
bar
+    "
+    );
+}
+
+#[test]
+fn test_pushdown_through_aggregate_grouping_sets_with_reordered_input() {
+    let scan = TestScanBuilder::new(schema()).with_support(true).build();
+
+    let reordered_schema = Arc::new(Schema::new(vec![
+        Field::new("c", DataType::Float64, false),
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Utf8, false),
+    ]));
+    let projection = Arc::new(
+        ProjectionExec::try_new(
+            vec![
+                (col("c", &schema()).unwrap(), "c".to_string()),
+                (col("a", &schema()).unwrap(), "a".to_string()),
+                (col("b", &schema()).unwrap(), "b".to_string()),
+            ],
+            scan,
+        )
+        .unwrap(),
+    );
+
+    let aggregate_expr = vec![
+        AggregateExprBuilder::new(
+            count_udaf(),
+            vec![col("c", &reordered_schema).unwrap()],
+        )
+        .schema(reordered_schema.clone())
+        .alias("cnt")
+        .build()
+        .map(Arc::new)
+        .unwrap(),
+    ];
+
+    // Use grouping sets (a, b) and (b).
+    let group_by = PhysicalGroupBy::new(
+        vec![
+            (col("a", &reordered_schema).unwrap(), "a".to_string()),
+            (col("b", &reordered_schema).unwrap(), "b".to_string()),
+        ],
+        vec![
+            (
+                Arc::new(Literal::new(ScalarValue::Utf8(None))),
+                "a".to_string(),
+            ),
+            (
+                Arc::new(Literal::new(ScalarValue::Utf8(None))),
+                "b".to_string(),
+            ),
+        ],
+        vec![vec![false, false], vec![true, false]],
+        true,
+    );
+
+    let aggregate = Arc::new(
+        AggregateExec::try_new(
+            AggregateMode::Final,
+            group_by,
+            aggregate_expr,
+            vec![None],
+            projection,
+            reordered_schema,
+        )
+        .unwrap(),
+    );
+
+    let agg_output_schema = aggregate.schema();
+
+    // Filter on b (present in all grouping sets) should be pushed down
+    let predicate = col_lit_predicate("b", "bar", &agg_output_schema);
+    let plan = Arc::new(FilterExec::try_new(predicate, 
aggregate.clone()).unwrap());
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, FilterPushdown::new(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: b@1 = bar
+        -   AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a, 
b@2 as b)], aggr=[cnt]
+        -     ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+        -       DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a, 
b@2 as b)], aggr=[cnt], ordering_mode=PartiallySorted([1])
+          -   ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+          -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = 
bar
+    "
+    );
+
+    // Filter on a (missing from second grouping set) should not be pushed down
+    let predicate = col_lit_predicate("a", "foo", &agg_output_schema);
+    let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, FilterPushdown::new(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a, 
b@2 as b)], aggr=[cnt]
+        -     ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+        -       DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - FilterExec: a@0 = foo
+          -   AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as 
a, b@2 as b)], aggr=[cnt]
+          -     ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+          -       DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+    "
+    );
+}
+
+/// Regression test for https://github.com/apache/datafusion/issues/21065.
+///
+/// Given a plan similar to the following, ensure that the filter is pushed 
down
+/// through an AggregateExec whose input columns are reordered by a 
ProjectionExec.
+#[tokio::test]
+async fn 
test_hashjoin_dynamic_filter_pushdown_through_aggregate_with_reordered_input() {
+    // Build side
+    let build_batches = vec![record_batch!(("a", Utf8, ["h1", 
"h2"])).unwrap()];
+    let build_schema =
+        Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
+    let build_scan = TestScanBuilder::new(Arc::clone(&build_schema))
+        .with_support(true)
+        .with_batches(build_batches)
+        .build();
+
+    // Probe side
+    let probe_batches = vec![
+        record_batch!(
+            ("a", Utf8, ["h1", "h2", "h3", "h4"]),
+            ("value", Float64, [1.0, 2.0, 3.0, 4.0])
+        )
+        .unwrap(),
+    ];
+    let probe_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("value", DataType::Float64, false),
+    ]));
+    let probe_scan = TestScanBuilder::new(Arc::clone(&probe_schema))
+        .with_support(true)
+        .with_batches(probe_batches)
+        .build();
+
+    // ProjectionExec reorders (a, value) → (value, a)
+    let reordered_schema = Arc::new(Schema::new(vec![
+        Field::new("value", DataType::Float64, false),
+        Field::new("a", DataType::Utf8, false),
+    ]));
+    let projection = Arc::new(
+        ProjectionExec::try_new(
+            vec![
+                (col("value", &probe_schema).unwrap(), "value".to_string()),
+                (col("a", &probe_schema).unwrap(), "a".to_string()),
+            ],
+            probe_scan,
+        )
+        .unwrap(),
+    );
+
+    // AggregateExec: GROUP BY a@1, min(value@0)
+    let aggregate_expr = vec![
+        AggregateExprBuilder::new(
+            min_udaf(),
+            vec![col("value", &reordered_schema).unwrap()],
+        )
+        .schema(reordered_schema.clone())
+        .alias("min_value")
+        .build()
+        .map(Arc::new)
+        .unwrap(),
+    ];
+    let group_by = PhysicalGroupBy::new_single(vec![(
+        col("a", &reordered_schema).unwrap(), // a@1 in input
+        "a".to_string(),
+    )]);
+
+    let aggregate = Arc::new(
+        AggregateExec::try_new(
+            AggregateMode::Single,
+            group_by,
+            aggregate_expr,
+            vec![None],
+            projection,
+            reordered_schema,
+        )
+        .unwrap(),
+    );
+
+    // Aggregate output schema: (a@0, min_value@1)
+    let agg_output_schema = aggregate.schema();
+
+    // Join the build and probe side
+    let plan = Arc::new(
+        HashJoinExec::try_new(
+            build_scan,
+            aggregate,
+            vec![(
+                col("a", &build_schema).unwrap(),
+                col("a", &agg_output_schema).unwrap(),
+            )],
+            None,
+            &JoinType::Inner,
+            None,
+            PartitionMode::CollectLeft,
+            datafusion_common::NullEquality::NullEqualsNothing,
+            false,
+        )
+        .unwrap(),
+    ) as Arc<dyn ExecutionPlan>;
+
+    // The HashJoin's dynamic filter on `a` should push
+    // through the aggregate and reach the probe-side DataSource.
+    insta::assert_snapshot!(
+        OptimizationTest::new(Arc::clone(&plan), 
FilterPushdown::new_post_optimization(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)]
+        -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a], file_type=test, pushdown_supported=true
+        -   AggregateExec: mode=Single, gby=[a@1 as a], aggr=[min_value]
+        -     ProjectionExec: expr=[value@1 as value, a@0 as a]
+        -       DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, value], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)]
+          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a], file_type=test, pushdown_supported=true
+          -   AggregateExec: mode=Single, gby=[a@1 as a], aggr=[min_value]
+          -     ProjectionExec: expr=[value@1 as value, a@0 as a]
+          -       DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, value], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ empty ]
+    "
+    );
+
+    // Actually execute the plan to verify the dynamic filter is populated
+    let mut config = ConfigOptions::default();
+    config.execution.parquet.pushdown_filters = true;
+    let plan = FilterPushdown::new_post_optimization()
+        .optimize(plan, &config)
+        .unwrap();
+
+    let session_config = SessionConfig::new().with_batch_size(10);
+    let session_ctx = SessionContext::new_with_config(session_config);
+    session_ctx.register_object_store(
+        ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+        Arc::new(InMemory::new()),
+    );
+    let state = session_ctx.state();
+    let task_ctx = state.task_ctx();
+    let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
+    stream.next().await.unwrap().unwrap();
+
+    // After execution, the dynamic filter should be populated with values
+    insta::assert_snapshot!(
+        format!("{}", format_plan_for_test(&plan)),
+        @r"
+    - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)]
+    -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a], file_type=test, pushdown_supported=true
+    -   AggregateExec: mode=Single, gby=[a@1 as a], aggr=[min_value]
+    -     ProjectionExec: expr=[value@1 as value, a@0 as a]
+    -       DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, value], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ a@0 >= h1 AND a@0 <= h2 AND a@0 IN (SET) ([h1, h2]) ]
+    "
+    );
+}
+
 #[test]
 fn test_pushdown_with_computed_grouping_key() {
     // Test filter pushdown with computed grouping expression
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 2b9d869818..fb2d64da83 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -640,7 +640,8 @@ pub struct AggregateExec {
     limit_options: Option<LimitOptions>,
     /// Input plan, could be a partial aggregate or the input to the aggregate
     pub input: Arc<dyn ExecutionPlan>,
-    /// Schema after the aggregate is applied
+    /// Schema after the aggregate is applied. Contains the group by columns 
followed by the
+    /// aggregate outputs.
     schema: SchemaRef,
     /// Input schema before any aggregation is applied. For partial aggregate 
this will be the
     /// same as input.schema() but for the final aggregate it will be the same 
as the input
@@ -1473,11 +1474,15 @@ impl ExecutionPlan for AggregateExec {
         // This optimization is NOT safe for filters on aggregated columns 
(like filtering on
         // the result of SUM or COUNT), as those require computing all groups 
first.
 
-        let grouping_columns: HashSet<_> = self
-            .group_by
-            .expr()
-            .iter()
-            .flat_map(|(expr, _)| collect_columns(expr))
+        // Build grouping columns using output indices because parent filters 
reference the
+        // AggregateExec's output schema where grouping columns in the output 
schema. The
+        // grouping expressions reference input columns which may not match 
the output schema.
+        //
+        // It is safe to assume that the output_schema contains group by 
columns in the same order
+        // as the group by expression. See [`create_schema`] and 
[`AggregateExec`].
+        let output_schema = self.schema();
+        let grouping_columns: HashSet<_> = (0..self.group_by.expr().len())
+            .map(|i| Column::new(output_schema.field(i).name(), i))
             .collect();
 
         // Analyze each filter separately to determine if it can be pushed down
@@ -1502,9 +1507,7 @@ impl ExecutionPlan for AggregateExec {
                 let filter_column_indices: Vec<usize> = filter_columns
                     .iter()
                     .filter_map(|filter_col| {
-                        self.group_by.expr().iter().position(|(expr, _)| {
-                            collect_columns(expr).contains(filter_col)
-                        })
+                        grouping_columns.get(filter_col).map(|col| col.index())
                     })
                     .collect();
 
@@ -1600,6 +1603,8 @@ impl ExecutionPlan for AggregateExec {
     }
 }
 
+/// Creates the output schema for an [`AggregateExec`] containing the group by 
columns followed
+/// by the aggregate columns.
 fn create_schema(
     input_schema: &Schema,
     group_by: &PhysicalGroupBy,
diff --git a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt 
b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt
index ea75ac32b3..cfc564fa2b 100644
--- a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt
+++ b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt
@@ -207,6 +207,45 @@ reset datafusion.optimizer.enable_dynamic_filter_pushdown;
 statement ok
 reset datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown;
 
+# Regression test for https://github.com/apache/datafusion/issues/21065
+# Ensure filter pushdown through AggregateExec still works when a 
ProjectionExec
+# reorders aggregate input columns.
+
+statement ok
+create external table agg_reordered_pushdown stored as parquet location 
'../../parquet-testing/data/alltypes_plain.parquet';
+
+query TBI rowsort
+select a, b, cnt
+from (
+    select a, b, count(c) as cnt
+    from (
+        select id as c, cast(string_col as varchar) as a, bool_col as b
+        from agg_reordered_pushdown
+    ) t
+    group by a, b
+) q
+where b = true;
+----
+0 true 4
+
+query TBI rowsort
+select a, b, cnt
+from (
+    select a, b, count(c) as cnt
+    from (
+        select id as c, cast(string_col as varchar) as a, bool_col as b
+        from agg_reordered_pushdown
+    ) t
+    group by a, b
+) q
+where cnt > 1;
+----
+0 true 4
+1 false 4
+
+statement ok
+drop table agg_reordered_pushdown;
+
 statement ok
 drop table agg_dyn_test;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to