danielhumanmod commented on code in PR #19757:
URL: https://github.com/apache/datafusion/pull/19757#discussion_r2685023498


##########
datafusion/physical-optimizer/src/ensure_coop.rs:
##########
@@ -67,23 +67,44 @@ impl PhysicalOptimizerRule for EnsureCooperative {
         plan: Arc<dyn ExecutionPlan>,
         _config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        plan.transform_up(|plan| {
-            let is_leaf = plan.children().is_empty();
-            let is_exchange = plan.properties().evaluation_type == 
EvaluationType::Eager;
-            if (is_leaf || is_exchange)
-                && plan.properties().scheduling_type != 
SchedulingType::Cooperative
-            {
-                // Wrap non-cooperative leaves or eager evaluation roots in a 
cooperative exec to
-                // ensure the plans they participate in are properly 
cooperative.
-                Ok(Transformed::new(
-                    Arc::new(CooperativeExec::new(Arc::clone(&plan))),
-                    true,
-                    TreeNodeRecursion::Continue,
-                ))
-            } else {
+        use std::cell::Cell;
+
+        // Track depth: 0 means not under any CooperativeExec
+        // Using Cell to allow interior mutability from multiple closures
+        let coop_depth = Cell::new(0usize);
+
+        plan.transform_down_up(
+            // Down phase: Track when entering CooperativeExec subtrees
+            |plan| {
+                if plan.as_any().downcast_ref::<CooperativeExec>().is_some() {
+                    coop_depth.set(coop_depth.get() + 1);
+                }
                 Ok(Transformed::no(plan))
-            }
-        })
+            },
+            // Up phase: Wrap nodes with CooperativeExec if needed, then 
restore depth
+            |plan| {
+                let is_cooperative =
+                    plan.properties().scheduling_type == 
SchedulingType::Cooperative;
+                let is_leaf = plan.children().is_empty();
+                let is_exchange =
+                    plan.properties().evaluation_type == EvaluationType::Eager;
+
+                // Wrap if:
+                // 1. Node is a leaf or exchange point
+                // 2. Node is not already cooperative
+                // 3. Not under any CooperativeExec (depth == 0)
+                if (is_leaf || is_exchange) && !is_cooperative && 
coop_depth.get() == 0 {

Review Comment:
   Great catch! Totally missed the case where an Eager node breaks the 
cooperative chain. 
   
   My plan is to maintain an ancestry stack that tracks both SchedulingType and 
EvaluationType. The new logic checks the stack bottom-up: a node is only 
considered 'protected' (and thus skips wrapping) if it encounters a Cooperative 
ancestor before any Eager pipeline breaker.
   
   I have also added a test case to cover this scenario. Thanks for the insight!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to