jayshrivastava commented on code in PR #21059:
URL: https://github.com/apache/datafusion/pull/21059#discussion_r2997267091
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -3145,6 +3147,425 @@ fn test_pushdown_with_empty_group_by() {
);
}
+#[test]
+fn test_pushdown_through_aggregate_with_reordered_input_columns() {
+ let scan = TestScanBuilder::new(schema()).with_support(true).build();
+
+ // Reorder scan output from (a, b, c) to (c, a, b)
+ let reordered_schema = Arc::new(Schema::new(vec![
+ Field::new("c", DataType::Float64, false),
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ ]));
+ let projection = Arc::new(
+ ProjectionExec::try_new(
+ vec![
+ (col("c", &schema()).unwrap(), "c".to_string()),
+ (col("a", &schema()).unwrap(), "a".to_string()),
+ (col("b", &schema()).unwrap(), "b".to_string()),
+ ],
+ scan,
+ )
+ .unwrap(),
+ );
+
+ let aggregate_expr = vec![
+ AggregateExprBuilder::new(
+ count_udaf(),
+ vec![col("c", &reordered_schema).unwrap()],
+ )
+ .schema(reordered_schema.clone())
+ .alias("cnt")
+ .build()
+ .map(Arc::new)
+ .unwrap(),
+ ];
+
+ // Group by a@1, b@2 (input indices in reordered schema)
+ let group_by = PhysicalGroupBy::new_single(vec![
+ (col("a", &reordered_schema).unwrap(), "a".to_string()),
+ (col("b", &reordered_schema).unwrap(), "b".to_string()),
+ ]);
+
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Final,
+ group_by,
+ aggregate_expr,
+ vec![None],
+ projection,
+ reordered_schema,
+ )
+ .unwrap(),
+ );
+
+ // Filter on b@1 in aggregate's output schema (a@0, b@1, cnt@2)
+ // The grouping expr for b references input index 2, but output index is 1.
+ let agg_output_schema = aggregate.schema();
+ let predicate = col_lit_predicate("b", "bar", &agg_output_schema);
+ let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
+
+ // The filter should be pushed down
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: b@1 = bar
+ - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt],
ordering_mode=PartiallySorted([1])
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 =
bar
+ "
+ );
+}
+
+#[test]
+fn
test_pushdown_through_aggregate_with_reordered_input_no_pushdown_on_agg_result()
{
+ let scan = TestScanBuilder::new(schema()).with_support(true).build();
+
+ let reordered_schema = Arc::new(Schema::new(vec![
+ Field::new("c", DataType::Float64, false),
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ ]));
+ let projection = Arc::new(
+ ProjectionExec::try_new(
+ vec![
+ (col("c", &schema()).unwrap(), "c".to_string()),
+ (col("a", &schema()).unwrap(), "a".to_string()),
+ (col("b", &schema()).unwrap(), "b".to_string()),
+ ],
+ scan,
+ )
+ .unwrap(),
+ );
+
+ let aggregate_expr = vec![
+ AggregateExprBuilder::new(
+ count_udaf(),
+ vec![col("c", &reordered_schema).unwrap()],
+ )
+ .schema(reordered_schema.clone())
+ .alias("cnt")
+ .build()
+ .map(Arc::new)
+ .unwrap(),
+ ];
+
+ let group_by = PhysicalGroupBy::new_single(vec![
+ (col("a", &reordered_schema).unwrap(), "a".to_string()),
+ (col("b", &reordered_schema).unwrap(), "b".to_string()),
+ ]);
+
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Final,
+ group_by,
+ aggregate_expr,
+ vec![None],
+ projection,
+ reordered_schema,
+ )
+ .unwrap(),
+ );
+
+ // Filter on cnt@2 (aggregate result, not a grouping column)
+ let agg_output_schema = aggregate.schema();
+ let predicate = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new_with_schema("cnt", &agg_output_schema).unwrap()),
+ Operator::Gt,
+ Arc::new(Literal::new(ScalarValue::Int64(Some(5)))),
+ )) as Arc<dyn PhysicalExpr>;
+ let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
+
+ // The filter is not pushed down.
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: cnt@2 > 5
+ - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - FilterExec: cnt@2 > 5
+ - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ "
+ );
+}
+
+#[test]
+fn test_pushdown_through_aggregate_grouping_sets_with_reordered_input() {
+ let scan = TestScanBuilder::new(schema()).with_support(true).build();
+
+ let reordered_schema = Arc::new(Schema::new(vec![
+ Field::new("c", DataType::Float64, false),
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ ]));
+ let projection = Arc::new(
+ ProjectionExec::try_new(
+ vec![
+ (col("c", &schema()).unwrap(), "c".to_string()),
+ (col("a", &schema()).unwrap(), "a".to_string()),
+ (col("b", &schema()).unwrap(), "b".to_string()),
+ ],
+ scan,
+ )
+ .unwrap(),
+ );
+
+ let aggregate_expr = vec![
+ AggregateExprBuilder::new(
+ count_udaf(),
+ vec![col("c", &reordered_schema).unwrap()],
+ )
+ .schema(reordered_schema.clone())
+ .alias("cnt")
+ .build()
+ .map(Arc::new)
+ .unwrap(),
+ ];
+
+ // Use grouping sets (a, b) and (b).
+ let group_by = PhysicalGroupBy::new(
+ vec![
+ (col("a", &reordered_schema).unwrap(), "a".to_string()),
+ (col("b", &reordered_schema).unwrap(), "b".to_string()),
+ ],
+ vec![
+ (
+ Arc::new(Literal::new(ScalarValue::Utf8(None))),
+ "a".to_string(),
+ ),
+ (
+ Arc::new(Literal::new(ScalarValue::Utf8(None))),
+ "b".to_string(),
+ ),
+ ],
+ vec![vec![false, false], vec![true, false]],
+ true,
+ );
+
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Final,
+ group_by,
+ aggregate_expr,
+ vec![None],
+ projection,
+ reordered_schema,
+ )
+ .unwrap(),
+ );
+
+ let agg_output_schema = aggregate.schema();
+
+ // Filter on b (present in all grouping sets) should be pushed down
+ let predicate = col_lit_predicate("b", "bar", &agg_output_schema);
+ let plan = Arc::new(FilterExec::try_new(predicate,
aggregate.clone()).unwrap());
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: b@1 = bar
+ - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a,
b@2 as b)], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a,
b@2 as b)], aggr=[cnt], ordering_mode=PartiallySorted([1])
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 =
bar
+ "
+ );
+
+ // Filter on a (missing from second grouping set) should not be pushed down
+ let predicate = col_lit_predicate("a", "foo", &agg_output_schema);
+ let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: a@0 = foo
+ - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a,
b@2 as b)], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - FilterExec: a@0 = foo
+ - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as
a, b@2 as b)], aggr=[cnt]
+ - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ "
+ );
+}
+
+/// Regression test for https://github.com/apache/datafusion/issues/21065.
+///
+/// Given a plan similar to the following, ensure that the filter is pushed
down
+/// through an AggregateExec whose input columns are reordered by a
ProjectionExec.
+#[tokio::test]
Review Comment:
The other tests are relevant regression tests IMO I just didn't tag them
with this issue bc the issue talks about dynamic filters specifically.
I did remove 1 test bc it is really similar to another one. The other ones
fail without the fix.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]