wiedld commented on code in PR #14919: URL: https://github.com/apache/datafusion/pull/14919#discussion_r1975887998
########## datafusion/core/tests/physical_optimizer/enforce_sorting.rs: ########## @@ -3346,3 +3351,62 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_preserve_needed_coalesce() -> Result<()> { + // Input to EnforceSorting, from our test case. + let plan = projection_exec_with_alias( + union_exec(vec![parquet_exec_with_stats(); 2]), + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let plan = Arc::new(CoalescePartitionsExec::new(plan)); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let plan: Arc<dyn ExecutionPlan> = + single_partitioned_aggregate(plan, vec![("a".to_string(), "a1".to_string())]); + let plan = sort_exec(sort_key, plan); + + // Starting plan: as in our test case. + assert_eq!( + get_plan_string(&plan), + vec![ + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", Review Comment: Thank you for taking the time @berkaysynnada . The test case merged here is a reproducer. The `mode=SinglePartitioned` is coming from the actual plans we have running and failing in prod: https://github.com/influxdata/arrow-datafusion/pull/58#issuecomment-2654249703 A simplified view of it is: ``` "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", " CoalescePartitionsExec", " ProjectionExec: expr=[a@0 as a, b@1 as value]", " UnionExec", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet" ``` The coalesce gets removed for the `mode=SinglePartitioned`. Do you think that the agg exec is using the wrong mode? ########## datafusion/core/tests/physical_optimizer/enforce_sorting.rs: ########## @@ -3346,3 +3351,62 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_preserve_needed_coalesce() -> Result<()> { + // Input to EnforceSorting, from our test case. + let plan = projection_exec_with_alias( + union_exec(vec![parquet_exec_with_stats(); 2]), + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let plan = Arc::new(CoalescePartitionsExec::new(plan)); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let plan: Arc<dyn ExecutionPlan> = + single_partitioned_aggregate(plan, vec![("a".to_string(), "a1".to_string())]); + let plan = sort_exec(sort_key, plan); + + // Starting plan: as in our test case. + assert_eq!( + get_plan_string(&plan), + vec![ + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", Review Comment: If you want the full history of the exact plan failing for us (and not the reproducer), how this plan was mutated in the EnforceDistribution and EnforceSorting, I have outlined it here: https://github.com/influxdata/arrow-datafusion/pull/58#issuecomment-2654249703 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org