alamb commented on code in PR #8817:
URL: https://github.com/apache/arrow-datafusion/pull/8817#discussion_r1466696309


##########
datafusion/common/src/tree_node.rs:
##########
@@ -33,27 +46,17 @@ use crate::Result;
 /// [`PhysicalExpr`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.PhysicalExpr.html
 /// [`LogicalPlan`]: 
https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/enum.LogicalPlan.html
 /// [`Expr`]: 
https://docs.rs/datafusion-expr/latest/datafusion_expr/expr/enum.Expr.html
-pub trait TreeNode: Sized + Clone {
-    /// Returns all children of the TreeNode
-    fn children_nodes(&self) -> Vec<Cow<Self>>;
-
+pub trait TreeNode: Sized {
     /// Use preorder to iterate the node on the tree so that we can
     /// stop fast for some cases.
     ///
     /// The `op` closure can be used to collect some info from the
     /// tree node or do some checking for the tree node.

Review Comment:
   Perhaps this documentation can be updated, something like
   
   ```suggestion
       /// Applies `op` to the node and its children, with traversal controlled 
by 
       /// [`VisitRecursion`]
       ///
       /// The `op` closure can be used to collect some info from the
       /// tree node or do some checking for the tree node.
   ```



##########
datafusion/common/src/tree_node.rs:
##########
@@ -18,11 +18,24 @@
 //! This module provides common traits for visiting or rewriting tree
 //! data structures easily.
 
-use std::borrow::Cow;
 use std::sync::Arc;
 
 use crate::Result;
 
+#[macro_export]
+macro_rules! handle_tree_recursion {

Review Comment:
   Perhaps we can document what this macro does (aka `return`s from the 
function if Skip or Stop)



##########
datafusion/common/src/tree_node.rs:
##########
@@ -366,12 +350,47 @@ impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
     {
         let children = self.arc_children();
         if !children.is_empty() {
-            let new_children: Result<Vec<_>> =
-                children.into_iter().map(transform).collect();
+            let new_children =
+                children.into_iter().map(transform).collect::<Result<_>>()?;
             let arc_self = Arc::clone(&self);
-            self.with_new_arc_children(arc_self, new_children?)
+            self.with_new_arc_children(arc_self, new_children)
         } else {
             Ok(self)
         }
     }
 }
+
+pub trait ConcreteTreeNode: Sized {

Review Comment:
   I think we should document this trait, focusing what it should be used for, 
perhaps using some of the great description from this PR?
   
   I think documenting what the functions do and their expectations, 
`take_children` in particular, would be valuable as well



##########
datafusion/core/src/physical_optimizer/enforce_distribution.rs:
##########
@@ -1286,151 +1214,78 @@ fn ensure_distribution(
     )
     .collect::<Result<Vec<_>>>()?;
 
-    let new_distribution_context = DistributionContext {
-        plan: if plan.as_any().is::<UnionExec>()
-            && can_interleave(children_nodes.iter().map(|c| c.plan.clone()))
-        {
-            // Add a special case for [`UnionExec`] since we want to "bubble 
up"
-            // hash-partitioned data. So instead of
-            //
-            // Agg:
-            //   Repartition (hash):
-            //     Union:
-            //       - Agg:
-            //           Repartition (hash):
-            //             Data
-            //       - Agg:
-            //           Repartition (hash):
-            //             Data
-            //
-            // we can use:
-            //
-            // Agg:
-            //   Interleave:
-            //     - Agg:
-            //         Repartition (hash):
-            //           Data
-            //     - Agg:
-            //         Repartition (hash):
-            //           Data
-            Arc::new(InterleaveExec::try_new(
-                children_nodes.iter().map(|c| c.plan.clone()).collect(),
-            )?)
-        } else {
-            plan.with_new_children(
-                children_nodes.iter().map(|c| c.plan.clone()).collect(),
-            )?
-        },
-        distribution_connection,
-        children_nodes,
+    let children_plans = children.iter().map(|c| 
c.plan.clone()).collect::<Vec<_>>();
+    plan = if plan.as_any().is::<UnionExec>() && 
can_interleave(children_plans.iter()) {
+        // Add a special case for [`UnionExec`] since we want to "bubble up"
+        // hash-partitioned data. So instead of
+        //
+        // Agg:
+        //   Repartition (hash):
+        //     Union:
+        //       - Agg:
+        //           Repartition (hash):
+        //             Data
+        //       - Agg:
+        //           Repartition (hash):
+        //             Data
+        //
+        // we can use:
+        //
+        // Agg:
+        //   Interleave:
+        //     - Agg:
+        //         Repartition (hash):
+        //           Data
+        //     - Agg:
+        //         Repartition (hash):
+        //           Data
+        Arc::new(InterleaveExec::try_new(children_plans)?)
+    } else {
+        plan.with_new_children(children_plans)?
     };
 
-    Ok(Transformed::Yes(new_distribution_context))
-}
-
-/// A struct to keep track of distribution changing operators
-/// (`RepartitionExec`, `SortPreservingMergeExec`, `CoalescePartitionsExec`),
-/// and their associated parents inside `plan`. Using this information,
-/// we can optimize distribution of the plan if/when necessary.
-#[derive(Debug, Clone)]
-struct DistributionContext {
-    plan: Arc<dyn ExecutionPlan>,
-    /// Indicates whether this plan is connected to a distribution-changing
-    /// operator.
-    distribution_connection: bool,
-    children_nodes: Vec<Self>,
-}
-
-impl DistributionContext {

Review Comment:
   it is really cool to see this same pattern extracted out



##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -840,26 +828,51 @@ mod tests_statistical {
         (big, medium, small)
     }
 
+    pub(crate) fn crosscheck_plans(plan: Arc<dyn ExecutionPlan>) -> Result<()> 
{
+        let pipeline = PipelineStatePropagator::new_default(plan);
+        let subrules: Vec<Box<PipelineFixerSubrule>> = vec![
+            Box::new(hash_join_convert_symmetric_subrule),
+            Box::new(hash_join_swap_subrule),
+        ];
+        let state = pipeline
+            .transform_up(&|p| apply_subrules(p, &subrules, 
&ConfigOptions::new()))
+            .and_then(check_integrity)?;
+        // TODO: End state payloads will be checked here.

Review Comment:
   Is this still TODO?



##########
datafusion/core/src/physical_optimizer/enforce_sorting.rs:
##########
@@ -812,6 +634,50 @@ mod tests {
             let session_ctx = SessionContext::new_with_config(config);
             let state = session_ctx.state();
 
+            // This file has 4 rules that use tree node, apply these rules as 
in the
+            // EnforSorting::optimize implementation
+            // After these operations tree nodes should be in a consistent 
state.
+            // This code block makes sure that these rules doesn't violate 
tree node integrity.
+            {
+                let plan_requirements = 
PlanWithCorrespondingSort::new_default($PLAN.clone());
+                let adjusted = plan_requirements
+                    .transform_up(&ensure_sorting)
+                    .and_then(check_integrity)?;
+                // TODO: End state payloads will be checked here.

Review Comment:
   are these still TODO items? Did you intent to complete the as part of this 
PR? If not, perhaps we can file a ticket to track their completion



##########
datafusion/physical-plan/src/tree_node.rs:
##########
@@ -35,3 +38,58 @@ impl DynTreeNode for dyn ExecutionPlan {
         with_new_children_if_necessary(arc_self, 
new_children).map(Transformed::into)
     }
 }
+
+#[derive(Debug)]
+pub struct PlanContext<T: Sized> {

Review Comment:
   Could we also document what this is used for? It seems it is used for 
rewriting execution plans where some additional information needs to be stored 
in addition to the plan itself
   
   Maybe a more descriptive name would be `PlanAndContext`? Though perhaps that 
is too verbose



##########
datafusion/core/src/physical_optimizer/test_utils.rs:
##########
@@ -361,3 +364,17 @@ pub fn sort_exec(
     let sort_exprs = sort_exprs.into_iter().collect();
     Arc::new(SortExec::new(sort_exprs, input))
 }
+
+pub fn check_integrity<T: Clone>(context: PlanContext<T>) -> 
Result<PlanContext<T>> {
+    context.transform_up(&|node| {
+        let children_plans = node.plan.children();
+        assert_eq!(node.children.len(), children_plans.len());
+        for (child_plan, child_node) in 
children_plans.iter().zip(node.children.iter()) {
+            assert_eq!(

Review Comment:
   Can you instead simply compare the plans directly rather than converting 
them to strings?



##########
datafusion/physical-expr/src/tree_node.rs:
##########
@@ -35,3 +38,57 @@ impl DynTreeNode for dyn PhysicalExpr {
         with_new_children_if_necessary(arc_self, new_children)
     }
 }
+
+#[derive(Debug)]
+pub struct ExprContext<T: Sized> {

Review Comment:
   Likewise documentation here would be awesome



##########
datafusion/core/src/physical_optimizer/test_utils.rs:
##########
@@ -361,3 +364,17 @@ pub fn sort_exec(
     let sort_exprs = sort_exprs.into_iter().collect();
     Arc::new(SortExec::new(sort_exprs, input))
 }
+
+pub fn check_integrity<T: Clone>(context: PlanContext<T>) -> 
Result<PlanContext<T>> {

Review Comment:
   Can you document what this is doing? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to