wiedld commented on code in PR #14949: URL: https://github.com/apache/datafusion/pull/14949#discussion_r1976064464
########## datafusion/core/tests/physical_optimizer/enforce_distribution.rs: ########## @@ -3154,3 +3157,204 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> { Ok(()) } + +/// Similar to [`macro_rules! assert_optimized`], but with the following: +/// * only run the EnforceDistribution once. +/// * then only run the EnforceSorting once. +/// * does not force round-robin repartitioning to be inserted. +/// +/// It also is a simplified test case, and does not handle JOINS. +/// (Does not run join key reordering, a pre-condition before distribution enforcement). +fn assert_optimized_without_forced_roundrobin( + expected: &[&str], + plan: Arc<dyn ExecutionPlan>, +) -> Arc<dyn ExecutionPlan> { + let config = Default::default(); + + // Add the ancillary output requirements operator at the start: + let optimizer = OutputRequirements::new_add_mode(); + let optimized = optimizer + .optimize(plan, &config) + .expect("failed to add output req node"); + + let optimizer = EnforceDistribution::new(); + let optimized = optimizer + .optimize(optimized, &config) + .expect("failed distribution enforcement"); + + let optimizer = EnforceSorting::new(); + let optimized = optimizer + .optimize(optimized, &config) + .expect("failed sorting enforcement"); + + // Remove the ancillary output requirements operator when done: + let optimizer = OutputRequirements::new_remove_mode(); + let optimized = optimizer + .optimize(optimized, &config) + .expect("failed to remove output req node"); + + // Now format correctly + let actual_lines = get_plan_string(&optimized); + + let expected_lines: Vec<&str> = expected.iter().map(|s| *s).collect(); + assert_eq!( + &expected_lines, &actual_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected_lines, actual_lines + ); + + optimized +} + +fn aggregate_over_sorted_union( + input: Vec<Arc<dyn ExecutionPlan>>, +) -> Arc<dyn ExecutionPlan> { + let union = union_exec(input); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key, union, false); + let plan = aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + assert_eq!( + get_plan_string(&plan), + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + plan +} + +#[test] +fn repartitions_for_aggregate_after_sorted_union() { + let plan = aggregate_over_sorted_union(vec![parquet_exec_with_stats(); 2]); + + // Starting: has expected distribution error + let err = "does not satisfy distribution requirements: SinglePartition. Child-0 output partitioning: UnknownPartitioning(2)"; + assert_sanity_check_err(&plan, err); + + // Outcome: + // * adds an SPM->Repartition before the first aggregation stage. + // * adds a Repartition->Sort before the second aggregation stage. + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortPreservingMergeExec: [a@0 ASC]", + " UnionExec", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let plan = assert_optimized_without_forced_roundrobin(expected_after_first_run, plan); + + // Outcome: is not idempotent. + let expected_after_second_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + assert_optimized_without_forced_roundrobin(expected_after_second_run, plan); +} + +/// Same as [`aggregate_over_sorted_union`], but with a projection btwn Union->Sort. +fn aggregate_over_sorted_union_projection( + input: Vec<Arc<dyn ExecutionPlan>>, +) -> Arc<dyn ExecutionPlan> { + let union = union_exec(input); + let union_projection = projection_exec_with_alias( + union, + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key, union_projection, false); + let plan = aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + // Difference: not the projection added between the Union->Sort. + assert_eq!( + get_plan_string(&plan), + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + plan +} + +/// Same starting plan as [`repartitions_for_aggregate_after_sorted_union`], but adds a projection +/// between the union and aggregate. This change the outcome: +/// +/// * we no longer get repartitioning, and instead get coalescing. +#[test] +fn coalesces_for_aggregate_after_sorted_union_projection() { + let plan = aggregate_over_sorted_union_projection(vec![parquet_exec_with_stats(); 2]); + + // Starting: has expected distribution error + let err = "does not satisfy distribution requirements: SinglePartition. Child-0 output partitioning: UnknownPartitioning(2)"; + assert_sanity_check_err(&plan, err); + + // Outcome: + // * adds an SPM->Repartition before the first aggregation stage. + // * adds a Repartition->Sort before the second aggregation stage. + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", Review Comment: This no longer inserts the coalesce. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org