wiedld commented on code in PR #14919:
URL: https://github.com/apache/datafusion/pull/14919#discussion_r1975963815


##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -3346,3 +3351,62 @@ async fn 
test_window_partial_constant_and_set_monotonicity() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn test_preserve_needed_coalesce() -> Result<()> {
+    // Input to EnforceSorting, from our test case.
+    let plan = projection_exec_with_alias(
+        union_exec(vec![parquet_exec_with_stats(); 2]),
+        vec![
+            ("a".to_string(), "a".to_string()),
+            ("b".to_string(), "value".to_string()),
+        ],
+    );
+    let plan = Arc::new(CoalescePartitionsExec::new(plan));
+    let schema = schema();
+    let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
+        expr: col("a", &schema).unwrap(),
+        options: SortOptions::default(),
+    }]);
+    let plan: Arc<dyn ExecutionPlan> =
+        single_partitioned_aggregate(plan, vec![("a".to_string(), 
"a1".to_string())]);
+    let plan = sort_exec(sort_key, plan);
+
+    // Starting plan: as in our test case.
+    assert_eq!(
+        get_plan_string(&plan),
+        vec![
+            "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
+            "  AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], 
aggr=[]",

Review Comment:
   > However, in this reproducer, you are giving an invalid plan to the 
EnforceSorting in terms of distribution.
   
   > However, in your example plan, AggregateExec has a single partition (as it 
is above CoalescePartitions)
   
   I think this means that the insertion of the coalesce in the first place 
(during the enforce distribution) is the bug. Since the input into the enforce 
distribution optimizer is:
   
   ```
        ...nodes...
           AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], 
aggr=[sum(Value)]
             AggregateExec: mode=Partial, 
gby=[date_bin_wallclock(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 
10000000000 }, time@0, 0) as time], aggr=[sum(Value)]
               SortExec: expr=[time@0 ASC NULLS LAST], 
preserve_partitioning=[false]
                 ProjectionExec: expr=[time@1 as time, f@0 as Value]
                   UnionExec. **multi-partitions**
                     ...multiple nodes...
   
   ```
   
   Our Union outputs multiple partitions, and the AggregateExec;mode=Partial 
can take multiple partitions. Therefore the enforce distribution should not 
have inserted the coalesce?
   
   I'll start by updating the reproducer test cases. Thank you. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to