gabotechs commented on code in PR #21059:
URL: https://github.com/apache/datafusion/pull/21059#discussion_r2962569154
##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -1473,11 +1473,12 @@ impl ExecutionPlan for AggregateExec {
// This optimization is NOT safe for filters on aggregated columns
(like filtering on
// the result of SUM or COUNT), as those require computing all groups
first.
- let grouping_columns: HashSet<_> = self
- .group_by
- .expr()
- .iter()
- .flat_map(|(expr, _)| collect_columns(expr))
+ // Build grouping columns using output indices because parent filters
reference the AggregateExec's output schema where grouping
+ // columns in the output schema. The grouping expressions reference
+ // input columns which may not match the output schema.
+ let output_schema = self.schema();
+ let grouping_columns: HashSet<_> = (0..self.group_by.expr().len())
+ .map(|i| Column::new(output_schema.field(i).name(), i))
Review Comment:
🤔 this assumes that the output schema is going to have the grouping
expressions always be the first ones.
Is this assumption correct? maybe a comment referencing where this is
enforced?
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -3145,6 +3147,425 @@ fn test_pushdown_with_empty_group_by() {
);
}
+#[test]
+fn test_pushdown_through_aggregate_with_reordered_input_columns() {
+ let scan = TestScanBuilder::new(schema()).with_support(true).build();
+
+ // Reorder scan output from (a, b, c) to (c, a, b)
+ let reordered_schema = Arc::new(Schema::new(vec![
+ Field::new("c", DataType::Float64, false),
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ ]));
+ let projection = Arc::new(
+ ProjectionExec::try_new(
+ vec![
+ (col("c", &schema()).unwrap(), "c".to_string()),
+ (col("a", &schema()).unwrap(), "a".to_string()),
+ (col("b", &schema()).unwrap(), "b".to_string()),
+ ],
+ scan,
+ )
+ .unwrap(),
+ );
+
+ let aggregate_expr = vec![
+ AggregateExprBuilder::new(
+ count_udaf(),
+ vec![col("c", &reordered_schema).unwrap()],
+ )
+ .schema(reordered_schema.clone())
+ .alias("cnt")
+ .build()
+ .map(Arc::new)
+ .unwrap(),
+ ];
+
+ // Group by a@1, b@2 (input indices in reordered schema)
+ let group_by = PhysicalGroupBy::new_single(vec![
+ (col("a", &reordered_schema).unwrap(), "a".to_string()),
+ (col("b", &reordered_schema).unwrap(), "b".to_string()),
+ ]);
+
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Final,
+ group_by,
+ aggregate_expr,
+ vec![None],
+ projection,
+ reordered_schema,
+ )
+ .unwrap(),
+ );
+
+ // Filter on b@1 in aggregate's output schema (a@0, b@1, cnt@2)
+ // The grouping expr for b references input index 2, but output index is 1.
+ let agg_output_schema = aggregate.schema();
+ let predicate = col_lit_predicate("b", "bar", &agg_output_schema);
+ let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
+
+ // The filter should be pushed down
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: b@1 = bar
+ - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt],
ordering_mode=PartiallySorted([1])
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 =
bar
+ "
+ );
+}
+
+#[test]
+fn
test_pushdown_through_aggregate_with_reordered_input_no_pushdown_on_agg_result()
{
+ let scan = TestScanBuilder::new(schema()).with_support(true).build();
+
+ let reordered_schema = Arc::new(Schema::new(vec![
+ Field::new("c", DataType::Float64, false),
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ ]));
+ let projection = Arc::new(
+ ProjectionExec::try_new(
+ vec![
+ (col("c", &schema()).unwrap(), "c".to_string()),
+ (col("a", &schema()).unwrap(), "a".to_string()),
+ (col("b", &schema()).unwrap(), "b".to_string()),
+ ],
+ scan,
+ )
+ .unwrap(),
+ );
+
+ let aggregate_expr = vec![
+ AggregateExprBuilder::new(
+ count_udaf(),
+ vec![col("c", &reordered_schema).unwrap()],
+ )
+ .schema(reordered_schema.clone())
+ .alias("cnt")
+ .build()
+ .map(Arc::new)
+ .unwrap(),
+ ];
+
+ let group_by = PhysicalGroupBy::new_single(vec![
+ (col("a", &reordered_schema).unwrap(), "a".to_string()),
+ (col("b", &reordered_schema).unwrap(), "b".to_string()),
+ ]);
+
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Final,
+ group_by,
+ aggregate_expr,
+ vec![None],
+ projection,
+ reordered_schema,
+ )
+ .unwrap(),
+ );
+
+ // Filter on cnt@2 (aggregate result, not a grouping column)
+ let agg_output_schema = aggregate.schema();
+ let predicate = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new_with_schema("cnt", &agg_output_schema).unwrap()),
+ Operator::Gt,
+ Arc::new(Literal::new(ScalarValue::Int64(Some(5)))),
+ )) as Arc<dyn PhysicalExpr>;
+ let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
+
+ // The filter is not pushed down.
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: cnt@2 > 5
+ - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - FilterExec: cnt@2 > 5
+ - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ "
+ );
+}
+
+#[test]
+fn test_pushdown_through_aggregate_grouping_sets_with_reordered_input() {
+ let scan = TestScanBuilder::new(schema()).with_support(true).build();
+
+ let reordered_schema = Arc::new(Schema::new(vec![
+ Field::new("c", DataType::Float64, false),
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ ]));
+ let projection = Arc::new(
+ ProjectionExec::try_new(
+ vec![
+ (col("c", &schema()).unwrap(), "c".to_string()),
+ (col("a", &schema()).unwrap(), "a".to_string()),
+ (col("b", &schema()).unwrap(), "b".to_string()),
+ ],
+ scan,
+ )
+ .unwrap(),
+ );
+
+ let aggregate_expr = vec![
+ AggregateExprBuilder::new(
+ count_udaf(),
+ vec![col("c", &reordered_schema).unwrap()],
+ )
+ .schema(reordered_schema.clone())
+ .alias("cnt")
+ .build()
+ .map(Arc::new)
+ .unwrap(),
+ ];
+
+ // Use grouping sets (a, b) and (b).
+ let group_by = PhysicalGroupBy::new(
+ vec![
+ (col("a", &reordered_schema).unwrap(), "a".to_string()),
+ (col("b", &reordered_schema).unwrap(), "b".to_string()),
+ ],
+ vec![
+ (
+ Arc::new(Literal::new(ScalarValue::Utf8(None))),
+ "a".to_string(),
+ ),
+ (
+ Arc::new(Literal::new(ScalarValue::Utf8(None))),
+ "b".to_string(),
+ ),
+ ],
+ vec![vec![false, false], vec![true, false]],
+ true,
+ );
+
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Final,
+ group_by,
+ aggregate_expr,
+ vec![None],
+ projection,
+ reordered_schema,
+ )
+ .unwrap(),
+ );
+
+ let agg_output_schema = aggregate.schema();
+
+ // Filter on b (present in all grouping sets) should be pushed down
+ let predicate = col_lit_predicate("b", "bar", &agg_output_schema);
+ let plan = Arc::new(FilterExec::try_new(predicate,
aggregate.clone()).unwrap());
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: b@1 = bar
+ - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a,
b@2 as b)], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a,
b@2 as b)], aggr=[cnt], ordering_mode=PartiallySorted([1])
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 =
bar
+ "
+ );
+
+ // Filter on a (missing from second grouping set) should not be pushed down
+ let predicate = col_lit_predicate("a", "foo", &agg_output_schema);
+ let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: a@0 = foo
+ - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a,
b@2 as b)], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - FilterExec: a@0 = foo
+ - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as
a, b@2 as b)], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ "
+ );
+}
+
+/// Regression test for https://github.com/apache/datafusion/issues/21065.
+///
+/// Given a plan similar to the following, ensure that the filter is pushed
down
+/// through an AggregateExec whose input columns are reordered by a
ProjectionExec.
+#[tokio::test]
Review Comment:
I see this is the only test referencing the issue? are the other tests also
contributing to reproducing the bug?
If you manage to reduce the quantity of test code necessary to reproduce the
issue, that would be awesome. Otherwise, if you think all four tests contribute
distinct ways of reproducing the issue, let's keep them all.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]