alamb commented on code in PR #9913:
URL: https://github.com/apache/arrow-datafusion/pull/9913#discussion_r1548533715


##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -370,6 +373,221 @@ impl LogicalPlan {
         }
     }
 
+    pub fn transform_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(

Review Comment:
   Can we please add some comments explaining what this is doing (namely that 
it applying a function to effectively rewrite the expressions of the 
LogicalPlan node in place)? It is a very cool API to see actually, as it will 
hopefully save a bunch of copying



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1080,8 +1298,72 @@ impl LogicalPlan {
 }
 
 impl LogicalPlan {
+    pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
+        &self,
+        f: &mut F,
+    ) -> Result<TreeNodeRecursion> {
+        // Compared to the default implementation, we need to invoke
+        // [`Self::apply_subqueries`] before visiting its children
+        handle_visit_recursion!(f(self)?, DOWN);
+        self.apply_subqueries(f)?;
+        self.apply_children(&mut |n| n.apply_with_subqueries(f))
+    }
+
+    pub fn visit_with_subqueries<V: TreeNodeVisitor<Node = Self>>(
+        &self,
+        visitor: &mut V,
+    ) -> Result<TreeNodeRecursion> {
+        // Compared to the default implementation, we need to invoke
+        // [`Self::visit_subqueries`] before visiting its children
+        match visitor.f_down(self)? {
+            TreeNodeRecursion::Continue => {
+                self.visit_subqueries(visitor)?;
+                handle_visit_recursion!(
+                    self.apply_children(&mut |n| 
n.visit_with_subqueries(visitor))?,
+                    UP
+                );
+                visitor.f_up(self)
+            }
+            TreeNodeRecursion::Jump => {
+                self.visit_subqueries(visitor)?;

Review Comment:
   Does this treat subqueries as though they were siblings of the node (like if 
`TNR::Jump` is returned subquery children will still be visited?)
   
   If so I think I would find that unexpected (I would expect that the 
subqueries are treated like additional children)



##########
datafusion/optimizer/src/analyzer/mod.rs:
##########
@@ -155,7 +155,7 @@ impl Analyzer {
 
 /// Do necessary check and fail the invalid plan
 fn check_plan(plan: &LogicalPlan) -> Result<()> {
-    plan.apply(&mut |plan: &LogicalPlan| {
+    plan.apply_with_subqueries(&mut |plan: &LogicalPlan| {

Review Comment:
   It is interesting that this didn't change any plans / tests. 



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1080,8 +1298,72 @@ impl LogicalPlan {
 }
 
 impl LogicalPlan {
+    pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(

Review Comment:
   Likewise here, can we please add docstrings?



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -370,6 +373,221 @@ impl LogicalPlan {
         }
     }
 
+    pub fn transform_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
+        self,
+        mut f: F,
+    ) -> Result<Transformed<Self>> {
+        Ok(match self {
+            LogicalPlan::Projection(Projection {
+                expr,
+                input,
+                schema,
+            }) => expr
+                .into_iter()
+                .map_until_stop_and_collect(f)?
+                .update_data(|expr| {
+                    LogicalPlan::Projection(Projection {
+                        expr,
+                        input,
+                        schema,
+                    })
+                }),
+            LogicalPlan::Values(Values { schema, values }) => values
+                .into_iter()
+                .map_until_stop_and_collect(|value| {
+                    value.into_iter().map_until_stop_and_collect(&mut f)
+                })?
+                .update_data(|values| LogicalPlan::Values(Values { schema, 
values })),
+            LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
+                .update_data(|predicate| {
+                    LogicalPlan::Filter(Filter { predicate, input })
+                }),
+            LogicalPlan::Repartition(Repartition {
+                input,
+                partitioning_scheme,
+            }) => match partitioning_scheme {
+                Partitioning::Hash(expr, usize) => expr
+                    .into_iter()
+                    .map_until_stop_and_collect(f)?
+                    .update_data(|expr| Partitioning::Hash(expr, usize)),
+                Partitioning::DistributeBy(expr) => expr
+                    .into_iter()
+                    .map_until_stop_and_collect(f)?
+                    .update_data(Partitioning::DistributeBy),
+                Partitioning::RoundRobinBatch(_) => 
Transformed::no(partitioning_scheme),
+            }
+            .update_data(|partitioning_scheme| {
+                LogicalPlan::Repartition(Repartition {
+                    input,
+                    partitioning_scheme,
+                })
+            }),
+            LogicalPlan::Window(Window {
+                input,
+                window_expr,
+                schema,
+            }) => window_expr
+                .into_iter()
+                .map_until_stop_and_collect(f)?
+                .update_data(|window_expr| {
+                    LogicalPlan::Window(Window {
+                        input,
+                        window_expr,
+                        schema,
+                    })
+                }),
+            LogicalPlan::Aggregate(Aggregate {
+                input,
+                group_expr,
+                aggr_expr,
+                schema,
+            }) => map_until_stop_and_collect!(
+                group_expr.into_iter().map_until_stop_and_collect(&mut f),
+                aggr_expr,
+                aggr_expr.into_iter().map_until_stop_and_collect(&mut f)
+            )?
+            .update_data(|(group_expr, aggr_expr)| {
+                LogicalPlan::Aggregate(Aggregate {
+                    input,
+                    group_expr,
+                    aggr_expr,
+                    schema,
+                })
+            }),
+
+            // There are two part of expression for join, equijoin(on) and 
non-equijoin(filter).
+            // 1. the first part is `on.len()` equijoin expressions, and the 
struct of each expr is `left-on = right-on`.
+            // 2. the second part is non-equijoin(filter).
+            LogicalPlan::Join(Join {
+                left,
+                right,
+                on,
+                filter,
+                join_type,
+                join_constraint,
+                schema,
+                null_equals_null,
+            }) => map_until_stop_and_collect!(
+                on.into_iter().map_until_stop_and_collect(
+                    |on| map_until_stop_and_collect!(f(on.0), on.1, f(on.1))
+                ),
+                filter,
+                filter.map_or(Ok::<_, DataFusionError>(Transformed::no(None)), 
|e| {

Review Comment:
   is this the magic required to handle `Option`?



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -370,6 +373,221 @@ impl LogicalPlan {
         }
     }
 
+    pub fn transform_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
+        self,
+        mut f: F,
+    ) -> Result<Transformed<Self>> {
+        Ok(match self {
+            LogicalPlan::Projection(Projection {
+                expr,
+                input,
+                schema,
+            }) => expr
+                .into_iter()
+                .map_until_stop_and_collect(f)?
+                .update_data(|expr| {

Review Comment:
   😍  -- that is quite a neat way to express this pattern and get the ownership 
sorted out 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to