geoffreyclaude commented on code in PR #22711:
URL: https://github.com/apache/datafusion/pull/22711#discussion_r3340068885


##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -1209,15 +1222,31 @@ pub fn check_default_invariants<P: ExecutionPlan + 
?Sized>(
     Ok(())
 }
 
-/// Indicate whether a data exchange is needed for the input of `plan`, which 
will be very helpful
-/// especially for the distributed engine to judge whether need to deal with 
shuffling.
-/// Currently, there are 3 kinds of execution plan which needs data exchange
-///     1. RepartitionExec for changing the partition number between two 
`ExecutionPlan`s
-///     2. CoalescePartitionsExec for collapsing all of the partitions into 
one without ordering guarantee
-///     3. SortPreservingMergeExec for collapsing all of the sorted partitions 
into one with ordering guarantee
+/// Indicate whether a data exchange is needed for the input of `plan`.
+///
+/// This identifies physical operators that redistribute child partitions or
+/// gather multiple child partitions into one output partition:
+///
+/// 1. RepartitionExec for non-round-robin repartitioning
+/// 2. CoalescePartitionsExec for collapsing multiple partitions into one 
without ordering guarantee
+/// 3. SortPreservingMergeExec for collapsing multiple sorted partitions into 
one with ordering guarantee
 #[expect(clippy::needless_pass_by_value)]
 pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
-    plan.properties().evaluation_type == EvaluationType::Eager
+    if let Some(repartition) = plan.downcast_ref::<RepartitionExec>() {
+        !matches!(repartition.partitioning(), Partitioning::RoundRobinBatch(_))
+    } else if let Some(coalesce) = 
plan.downcast_ref::<CoalescePartitionsExec>() {
+        coalesce.input().output_partitioning().partition_count() > 1
+    } else if let Some(sort_preserving_merge) =
+        plan.downcast_ref::<SortPreservingMergeExec>()
+    {
+        sort_preserving_merge
+            .input()
+            .output_partitioning()
+            .partition_count()
+            > 1
+    } else {
+        false
+    }

Review Comment:
   `need_data_exchange` was pretty broken then... As discussed privately, we 
should look into deprecating then deleting this method, as probably no-one is 
using it (at least correctly.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to