This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c931c89886 ensure dynamic filters are correctly pushed down through
aggregations (#21059)
c931c89886 is described below
commit c931c89886997bf486b148c19f33e3e33a933349
Author: Jayant Shrivastava <[email protected]>
AuthorDate: Thu Apr 2 09:52:46 2026 -0400
ensure dynamic filters are correctly pushed down through aggregations
(#21059)
## Which issue does this PR close?
- Closes https://github.com/apache/datafusion/issues/21065.
## Rationale for this change
In plans such as the following, dynamic filters are not pushed down
through the aggregation
```
CREATE TABLE data (a VARCHAR, ts TIMESTAMP, value DOUBLE)
AS VALUES
('h1', '2024-01-01T00:05:00', 1.0),
('h1', '2024-01-01T00:15:00', 2.0),
('h2', '2024-01-01T00:25:00', 3.0),
('h3', '2024-01-01T00:35:00', 4.0);
SELECT * FROM contexts c
INNER JOIN (
SELECT a, date_bin(interval '1 hour', ts) AS bucket, min(value) AS
min_val
FROM (SELECT value, a, ts FROM data)
GROUP BY a, date_bin(interval '1 hour', ts)
) agg ON c.a = agg.a;
```
```
HashJoinExec: mode=Auto, join_type=Inner, on=[(a@0, a@0)]
DataSourceExec: partitions=1
ProjectionExec: [a@0, date_bin(1h, ts)@1 as bucket, min(value)@2 as
min_val]
AggregateExec: mode=FinalPartitioned, gby=[a@0, date_bin(1h,
ts)@1], aggr=[min(value)]
AggregateExec: mode=Partial, gby=[a@1, date_bin(1h, ts@2)],
aggr=[min(value)]
ProjectionExec: [value@2, a@0, ts@1] ← reorders columns
DataSourceExec: partitions=1
```
## What changes are included in this PR?
`AggregateExec::gather_filters_for_pushdown` compared parent filter
columns (output schema indices) against grouping expression columns
(input schema indices). When a `ProjectionExec` below the aggregate
reorders columns, the index mismatch causes filters (such as HashJoin
dynamic filters) to be incorrectly blocked.
This change fixes the column index mapping in
`AggregateExec::gather_filters_for_pushdown`
## Are these changes tested?
- `test_pushdown_through_aggregate_with_reordered_input_columns` —
filter on grouping column with reordered input is pushed down
-
`test_pushdown_through_aggregate_with_reordered_input_no_pushdown_on_agg_result`
— filter on aggregate result column is not pushed down
- `test_pushdown_through_aggregate_grouping_sets_with_reordered_input` —
GROUPING SETS: filter on common column pushed, filter on missing column
blocked
-
`test_hashjoin_dynamic_filter_pushdown_through_aggregate_with_reordered_input`
— HashJoin dynamic filter pushes through aggregate with reordered input
and is populated with values after
execution
- All tests verified to fail without the fix
## Are there any user-facing changes?
No.
---
.../tests/physical_optimizer/filter_pushdown.rs | 342 +++++++++++++++++++++
datafusion/physical-plan/src/aggregates/mod.rs | 23 +-
.../test_files/push_down_filter_regression.slt | 39 +++
3 files changed, 395 insertions(+), 9 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
index 9f3dffd230..6fe77c5e89 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
@@ -34,6 +34,7 @@ use datafusion::{
scalar::ScalarValue,
};
use datafusion_catalog::memory::DataSourceExec;
+use datafusion_common::JoinType;
use datafusion_common::config::ConfigOptions;
use datafusion_datasource::{
PartitionedFile, file_groups::FileGroup,
file_scan_config::FileScanConfigBuilder,
@@ -53,6 +54,7 @@ use datafusion_physical_expr::{
use datafusion_physical_optimizer::{
PhysicalOptimizerRule, filter_pushdown::FilterPushdown,
};
+use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
use datafusion_physical_plan::{
ExecutionPlan,
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
@@ -3489,6 +3491,346 @@ fn test_pushdown_with_empty_group_by() {
);
}
+#[test]
+fn test_pushdown_through_aggregate_with_reordered_input_columns() {
+ let scan = TestScanBuilder::new(schema()).with_support(true).build();
+
+ // Reorder scan output from (a, b, c) to (c, a, b)
+ let reordered_schema = Arc::new(Schema::new(vec![
+ Field::new("c", DataType::Float64, false),
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ ]));
+ let projection = Arc::new(
+ ProjectionExec::try_new(
+ vec![
+ (col("c", &schema()).unwrap(), "c".to_string()),
+ (col("a", &schema()).unwrap(), "a".to_string()),
+ (col("b", &schema()).unwrap(), "b".to_string()),
+ ],
+ scan,
+ )
+ .unwrap(),
+ );
+
+ let aggregate_expr = vec![
+ AggregateExprBuilder::new(
+ count_udaf(),
+ vec![col("c", &reordered_schema).unwrap()],
+ )
+ .schema(reordered_schema.clone())
+ .alias("cnt")
+ .build()
+ .map(Arc::new)
+ .unwrap(),
+ ];
+
+ // Group by a@1, b@2 (input indices in reordered schema)
+ let group_by = PhysicalGroupBy::new_single(vec![
+ (col("a", &reordered_schema).unwrap(), "a".to_string()),
+ (col("b", &reordered_schema).unwrap(), "b".to_string()),
+ ]);
+
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Final,
+ group_by,
+ aggregate_expr,
+ vec![None],
+ projection,
+ reordered_schema,
+ )
+ .unwrap(),
+ );
+
+ // Filter on b@1 in aggregate's output schema (a@0, b@1, cnt@2)
+ // The grouping expr for b references input index 2, but output index is 1.
+ let agg_output_schema = aggregate.schema();
+ let predicate = col_lit_predicate("b", "bar", &agg_output_schema);
+ let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
+
+ // The filter should be pushed down
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: b@1 = bar
+ - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt],
ordering_mode=PartiallySorted([1])
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 =
bar
+ "
+ );
+}
+
+#[test]
+fn test_pushdown_through_aggregate_grouping_sets_with_reordered_input() {
+ let scan = TestScanBuilder::new(schema()).with_support(true).build();
+
+ let reordered_schema = Arc::new(Schema::new(vec![
+ Field::new("c", DataType::Float64, false),
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ ]));
+ let projection = Arc::new(
+ ProjectionExec::try_new(
+ vec![
+ (col("c", &schema()).unwrap(), "c".to_string()),
+ (col("a", &schema()).unwrap(), "a".to_string()),
+ (col("b", &schema()).unwrap(), "b".to_string()),
+ ],
+ scan,
+ )
+ .unwrap(),
+ );
+
+ let aggregate_expr = vec![
+ AggregateExprBuilder::new(
+ count_udaf(),
+ vec![col("c", &reordered_schema).unwrap()],
+ )
+ .schema(reordered_schema.clone())
+ .alias("cnt")
+ .build()
+ .map(Arc::new)
+ .unwrap(),
+ ];
+
+ // Use grouping sets (a, b) and (b).
+ let group_by = PhysicalGroupBy::new(
+ vec![
+ (col("a", &reordered_schema).unwrap(), "a".to_string()),
+ (col("b", &reordered_schema).unwrap(), "b".to_string()),
+ ],
+ vec![
+ (
+ Arc::new(Literal::new(ScalarValue::Utf8(None))),
+ "a".to_string(),
+ ),
+ (
+ Arc::new(Literal::new(ScalarValue::Utf8(None))),
+ "b".to_string(),
+ ),
+ ],
+ vec![vec![false, false], vec![true, false]],
+ true,
+ );
+
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Final,
+ group_by,
+ aggregate_expr,
+ vec![None],
+ projection,
+ reordered_schema,
+ )
+ .unwrap(),
+ );
+
+ let agg_output_schema = aggregate.schema();
+
+ // Filter on b (present in all grouping sets) should be pushed down
+ let predicate = col_lit_predicate("b", "bar", &agg_output_schema);
+ let plan = Arc::new(FilterExec::try_new(predicate,
aggregate.clone()).unwrap());
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: b@1 = bar
+ - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a,
b@2 as b)], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a,
b@2 as b)], aggr=[cnt], ordering_mode=PartiallySorted([1])
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 =
bar
+ "
+ );
+
+ // Filter on a (missing from second grouping set) should not be pushed down
+ let predicate = col_lit_predicate("a", "foo", &agg_output_schema);
+ let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: a@0 = foo
+ - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a,
b@2 as b)], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - FilterExec: a@0 = foo
+ - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as
a, b@2 as b)], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ "
+ );
+}
+
+/// Regression test for https://github.com/apache/datafusion/issues/21065.
+///
+/// Given a plan similar to the following, ensure that the filter is pushed
down
+/// through an AggregateExec whose input columns are reordered by a
ProjectionExec.
+#[tokio::test]
+async fn
test_hashjoin_dynamic_filter_pushdown_through_aggregate_with_reordered_input() {
+ // Build side
+ let build_batches = vec![record_batch!(("a", Utf8, ["h1",
"h2"])).unwrap()];
+ let build_schema =
+ Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
+ let build_scan = TestScanBuilder::new(Arc::clone(&build_schema))
+ .with_support(true)
+ .with_batches(build_batches)
+ .build();
+
+ // Probe side
+ let probe_batches = vec![
+ record_batch!(
+ ("a", Utf8, ["h1", "h2", "h3", "h4"]),
+ ("value", Float64, [1.0, 2.0, 3.0, 4.0])
+ )
+ .unwrap(),
+ ];
+ let probe_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("value", DataType::Float64, false),
+ ]));
+ let probe_scan = TestScanBuilder::new(Arc::clone(&probe_schema))
+ .with_support(true)
+ .with_batches(probe_batches)
+ .build();
+
+ // ProjectionExec reorders (a, value) → (value, a)
+ let reordered_schema = Arc::new(Schema::new(vec![
+ Field::new("value", DataType::Float64, false),
+ Field::new("a", DataType::Utf8, false),
+ ]));
+ let projection = Arc::new(
+ ProjectionExec::try_new(
+ vec![
+ (col("value", &probe_schema).unwrap(), "value".to_string()),
+ (col("a", &probe_schema).unwrap(), "a".to_string()),
+ ],
+ probe_scan,
+ )
+ .unwrap(),
+ );
+
+ // AggregateExec: GROUP BY a@1, min(value@0)
+ let aggregate_expr = vec![
+ AggregateExprBuilder::new(
+ min_udaf(),
+ vec![col("value", &reordered_schema).unwrap()],
+ )
+ .schema(reordered_schema.clone())
+ .alias("min_value")
+ .build()
+ .map(Arc::new)
+ .unwrap(),
+ ];
+ let group_by = PhysicalGroupBy::new_single(vec![(
+ col("a", &reordered_schema).unwrap(), // a@1 in input
+ "a".to_string(),
+ )]);
+
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Single,
+ group_by,
+ aggregate_expr,
+ vec![None],
+ projection,
+ reordered_schema,
+ )
+ .unwrap(),
+ );
+
+ // Aggregate output schema: (a@0, min_value@1)
+ let agg_output_schema = aggregate.schema();
+
+ // Join the build and probe side
+ let plan = Arc::new(
+ HashJoinExec::try_new(
+ build_scan,
+ aggregate,
+ vec![(
+ col("a", &build_schema).unwrap(),
+ col("a", &agg_output_schema).unwrap(),
+ )],
+ None,
+ &JoinType::Inner,
+ None,
+ PartitionMode::CollectLeft,
+ datafusion_common::NullEquality::NullEqualsNothing,
+ false,
+ )
+ .unwrap(),
+ ) as Arc<dyn ExecutionPlan>;
+
+ // The HashJoin's dynamic filter on `a` should push
+ // through the aggregate and reach the probe-side DataSource.
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&plan),
FilterPushdown::new_post_optimization(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a], file_type=test, pushdown_supported=true
+ - AggregateExec: mode=Single, gby=[a@1 as a], aggr=[min_value]
+ - ProjectionExec: expr=[value@1 as value, a@0 as a]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, value], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a], file_type=test, pushdown_supported=true
+ - AggregateExec: mode=Single, gby=[a@1 as a], aggr=[min_value]
+ - ProjectionExec: expr=[value@1 as value, a@0 as a]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, value], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ empty ]
+ "
+ );
+
+ // Actually execute the plan to verify the dynamic filter is populated
+ let mut config = ConfigOptions::default();
+ config.execution.parquet.pushdown_filters = true;
+ let plan = FilterPushdown::new_post_optimization()
+ .optimize(plan, &config)
+ .unwrap();
+
+ let session_config = SessionConfig::new().with_batch_size(10);
+ let session_ctx = SessionContext::new_with_config(session_config);
+ session_ctx.register_object_store(
+ ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+ Arc::new(InMemory::new()),
+ );
+ let state = session_ctx.state();
+ let task_ctx = state.task_ctx();
+ let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
+ stream.next().await.unwrap().unwrap();
+
+ // After execution, the dynamic filter should be populated with values
+ insta::assert_snapshot!(
+ format!("{}", format_plan_for_test(&plan)),
+ @r"
+ - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a], file_type=test, pushdown_supported=true
+ - AggregateExec: mode=Single, gby=[a@1 as a], aggr=[min_value]
+ - ProjectionExec: expr=[value@1 as value, a@0 as a]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, value], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ a@0 >= h1 AND a@0 <= h2 AND a@0 IN (SET) ([h1, h2]) ]
+ "
+ );
+}
+
#[test]
fn test_pushdown_with_computed_grouping_key() {
// Test filter pushdown with computed grouping expression
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index 2b9d869818..fb2d64da83 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -640,7 +640,8 @@ pub struct AggregateExec {
limit_options: Option<LimitOptions>,
/// Input plan, could be a partial aggregate or the input to the aggregate
pub input: Arc<dyn ExecutionPlan>,
- /// Schema after the aggregate is applied
+ /// Schema after the aggregate is applied. Contains the group by columns
followed by the
+ /// aggregate outputs.
schema: SchemaRef,
/// Input schema before any aggregation is applied. For partial aggregate
this will be the
/// same as input.schema() but for the final aggregate it will be the same
as the input
@@ -1473,11 +1474,15 @@ impl ExecutionPlan for AggregateExec {
// This optimization is NOT safe for filters on aggregated columns
(like filtering on
// the result of SUM or COUNT), as those require computing all groups
first.
- let grouping_columns: HashSet<_> = self
- .group_by
- .expr()
- .iter()
- .flat_map(|(expr, _)| collect_columns(expr))
+ // Build grouping columns using output indices because parent filters
reference the
+ // AggregateExec's output schema where grouping columns in the output
schema. The
+ // grouping expressions reference input columns which may not match
the output schema.
+ //
+ // It is safe to assume that the output_schema contains group by
columns in the same order
+ // as the group by expression. See [`create_schema`] and
[`AggregateExec`].
+ let output_schema = self.schema();
+ let grouping_columns: HashSet<_> = (0..self.group_by.expr().len())
+ .map(|i| Column::new(output_schema.field(i).name(), i))
.collect();
// Analyze each filter separately to determine if it can be pushed down
@@ -1502,9 +1507,7 @@ impl ExecutionPlan for AggregateExec {
let filter_column_indices: Vec<usize> = filter_columns
.iter()
.filter_map(|filter_col| {
- self.group_by.expr().iter().position(|(expr, _)| {
- collect_columns(expr).contains(filter_col)
- })
+ grouping_columns.get(filter_col).map(|col| col.index())
})
.collect();
@@ -1600,6 +1603,8 @@ impl ExecutionPlan for AggregateExec {
}
}
+/// Creates the output schema for an [`AggregateExec`] containing the group by
columns followed
+/// by the aggregate columns.
fn create_schema(
input_schema: &Schema,
group_by: &PhysicalGroupBy,
diff --git a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt
b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt
index ea75ac32b3..cfc564fa2b 100644
--- a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt
+++ b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt
@@ -207,6 +207,45 @@ reset datafusion.optimizer.enable_dynamic_filter_pushdown;
statement ok
reset datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown;
+# Regression test for https://github.com/apache/datafusion/issues/21065
+# Ensure filter pushdown through AggregateExec still works when a
ProjectionExec
+# reorders aggregate input columns.
+
+statement ok
+create external table agg_reordered_pushdown stored as parquet location
'../../parquet-testing/data/alltypes_plain.parquet';
+
+query TBI rowsort
+select a, b, cnt
+from (
+ select a, b, count(c) as cnt
+ from (
+ select id as c, cast(string_col as varchar) as a, bool_col as b
+ from agg_reordered_pushdown
+ ) t
+ group by a, b
+) q
+where b = true;
+----
+0 true 4
+
+query TBI rowsort
+select a, b, cnt
+from (
+ select a, b, count(c) as cnt
+ from (
+ select id as c, cast(string_col as varchar) as a, bool_col as b
+ from agg_reordered_pushdown
+ ) t
+ group by a, b
+) q
+where cnt > 1;
+----
+0 true 4
+1 false 4
+
+statement ok
+drop table agg_reordered_pushdown;
+
statement ok
drop table agg_dyn_test;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]