pepijnve commented on code in PR #19757:
URL: https://github.com/apache/datafusion/pull/19757#discussion_r2683270708


##########
datafusion/physical-optimizer/src/ensure_coop.rs:
##########
@@ -67,23 +67,44 @@ impl PhysicalOptimizerRule for EnsureCooperative {
         plan: Arc<dyn ExecutionPlan>,
         _config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        plan.transform_up(|plan| {
-            let is_leaf = plan.children().is_empty();
-            let is_exchange = plan.properties().evaluation_type == 
EvaluationType::Eager;
-            if (is_leaf || is_exchange)
-                && plan.properties().scheduling_type != 
SchedulingType::Cooperative
-            {
-                // Wrap non-cooperative leaves or eager evaluation roots in a 
cooperative exec to
-                // ensure the plans they participate in are properly 
cooperative.
-                Ok(Transformed::new(
-                    Arc::new(CooperativeExec::new(Arc::clone(&plan))),
-                    true,
-                    TreeNodeRecursion::Continue,
-                ))
-            } else {
+        use std::cell::Cell;
+
+        // Track depth: 0 means not under any CooperativeExec
+        // Using Cell to allow interior mutability from multiple closures
+        let coop_depth = Cell::new(0usize);
+
+        plan.transform_down_up(
+            // Down phase: Track when entering CooperativeExec subtrees
+            |plan| {
+                if plan.as_any().downcast_ref::<CooperativeExec>().is_some() {
+                    coop_depth.set(coop_depth.get() + 1);
+                }
                 Ok(Transformed::no(plan))
-            }
-        })
+            },
+            // Up phase: Wrap nodes with CooperativeExec if needed, then 
restore depth
+            |plan| {
+                let is_cooperative =
+                    plan.properties().scheduling_type == 
SchedulingType::Cooperative;
+                let is_leaf = plan.children().is_empty();
+                let is_exchange =
+                    plan.properties().evaluation_type == EvaluationType::Eager;
+
+                // Wrap if:
+                // 1. Node is a leaf or exchange point
+                // 2. Node is not already cooperative
+                // 3. Not under any CooperativeExec (depth == 0)
+                if (is_leaf || is_exchange) && !is_cooperative && 
coop_depth.get() == 0 {

Review Comment:
   There aren't any implementations in the library that you could use to test 
this, but I'm not sure this is 100% correct if someone ever implements a 
non-cooperative exchange operator (i.e. one that doesn't use a Tokio 
mpsc::channel). I'll see if I can come up with a test case for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to