This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b9759b9810 Consolidate LogicalPlan tree node walking/rewriting code 
into one module (#10034)
b9759b9810 is described below

commit b9759b9810a05b7993c0357a5346197395cfd4cc
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Apr 10 17:31:41 2024 -0400

    Consolidate LogicalPlan tree node walking/rewriting code into one module 
(#10034)
---
 datafusion/expr/src/logical_plan/plan.rs      | 515 +-------------------------
 datafusion/expr/src/logical_plan/tree_node.rs | 511 ++++++++++++++++++++++++-
 2 files changed, 515 insertions(+), 511 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 7bad034a11..d16dfb1403 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -25,9 +25,7 @@ use std::sync::Arc;
 use super::dml::CopyTo;
 use super::DdlStatement;
 use crate::builder::change_redundant_column;
-use crate::expr::{
-    Alias, Exists, InSubquery, Placeholder, Sort as SortExpr, WindowFunction,
-};
+use crate::expr::{Alias, Placeholder, Sort as SortExpr, WindowFunction};
 use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
 use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
 use crate::logical_plan::extension::UserDefinedLogicalNode;
@@ -44,19 +42,16 @@ use crate::{
 
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion_common::tree_node::{
-    Transformed, TransformedResult, TreeNode, TreeNodeIterator, 
TreeNodeRecursion,
-    TreeNodeRewriter, TreeNodeVisitor,
+    Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
 };
 use datafusion_common::{
-    aggregate_functional_dependencies, internal_err, 
map_until_stop_and_collect,
-    plan_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, 
Dependency,
-    FunctionalDependence, FunctionalDependencies, ParamValues, Result, 
TableReference,
-    UnnestOptions,
+    aggregate_functional_dependencies, internal_err, plan_err, Column, 
Constraints,
+    DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
+    FunctionalDependencies, ParamValues, Result, TableReference, UnnestOptions,
 };
 
 // backwards compatibility
 use crate::display::PgJsonVisitor;
-use crate::tree_node::transform_option_vec;
 pub use datafusion_common::display::{PlanType, StringifiedPlan, 
ToStringifiedPlan};
 pub use datafusion_common::{JoinConstraint, JoinType};
 
@@ -315,314 +310,6 @@ impl LogicalPlan {
         err
     }
 
-    /// Calls `f` on all expressions in the current `LogicalPlan` node.
-    ///
-    /// Note this does not include expressions in child `LogicalPlan` nodes.
-    pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
-        &self,
-        mut f: F,
-    ) -> Result<TreeNodeRecursion> {
-        match self {
-            LogicalPlan::Projection(Projection { expr, .. }) => {
-                expr.iter().apply_until_stop(f)
-            }
-            LogicalPlan::Values(Values { values, .. }) => values
-                .iter()
-                .apply_until_stop(|value| value.iter().apply_until_stop(&mut 
f)),
-            LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
-            LogicalPlan::Repartition(Repartition {
-                partitioning_scheme,
-                ..
-            }) => match partitioning_scheme {
-                Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) 
=> {
-                    expr.iter().apply_until_stop(f)
-                }
-                Partitioning::RoundRobinBatch(_) => 
Ok(TreeNodeRecursion::Continue),
-            },
-            LogicalPlan::Window(Window { window_expr, .. }) => {
-                window_expr.iter().apply_until_stop(f)
-            }
-            LogicalPlan::Aggregate(Aggregate {
-                group_expr,
-                aggr_expr,
-                ..
-            }) => group_expr
-                .iter()
-                .chain(aggr_expr.iter())
-                .apply_until_stop(f),
-            // There are two part of expression for join, equijoin(on) and 
non-equijoin(filter).
-            // 1. the first part is `on.len()` equijoin expressions, and the 
struct of each expr is `left-on = right-on`.
-            // 2. the second part is non-equijoin(filter).
-            LogicalPlan::Join(Join { on, filter, .. }) => {
-                on.iter()
-                    // TODO: why we need to create an `Expr::eq`? Cloning 
`Expr` is costly...
-                    // it not ideal to create an expr here to analyze them, 
but could cache it on the Join itself
-                    .map(|(l, r)| Expr::eq(l.clone(), r.clone()))
-                    .apply_until_stop(|e| f(&e))?
-                    .visit_sibling(|| filter.iter().apply_until_stop(f))
-            }
-            LogicalPlan::Sort(Sort { expr, .. }) => 
expr.iter().apply_until_stop(f),
-            LogicalPlan::Extension(extension) => {
-                // would be nice to avoid this copy -- maybe can
-                // update extension to just observer Exprs
-                extension.node.expressions().iter().apply_until_stop(f)
-            }
-            LogicalPlan::TableScan(TableScan { filters, .. }) => {
-                filters.iter().apply_until_stop(f)
-            }
-            LogicalPlan::Unnest(Unnest { column, .. }) => {
-                f(&Expr::Column(column.clone()))
-            }
-            LogicalPlan::Distinct(Distinct::On(DistinctOn {
-                on_expr,
-                select_expr,
-                sort_expr,
-                ..
-            })) => on_expr
-                .iter()
-                .chain(select_expr.iter())
-                .chain(sort_expr.iter().flatten())
-                .apply_until_stop(f),
-            // plans without expressions
-            LogicalPlan::EmptyRelation(_)
-            | LogicalPlan::RecursiveQuery(_)
-            | LogicalPlan::Subquery(_)
-            | LogicalPlan::SubqueryAlias(_)
-            | LogicalPlan::Limit(_)
-            | LogicalPlan::Statement(_)
-            | LogicalPlan::CrossJoin(_)
-            | LogicalPlan::Analyze(_)
-            | LogicalPlan::Explain(_)
-            | LogicalPlan::Union(_)
-            | LogicalPlan::Distinct(Distinct::All(_))
-            | LogicalPlan::Dml(_)
-            | LogicalPlan::Ddl(_)
-            | LogicalPlan::Copy(_)
-            | LogicalPlan::DescribeTable(_)
-            | LogicalPlan::Prepare(_) => Ok(TreeNodeRecursion::Continue),
-        }
-    }
-
-    /// Rewrites all expressions in the current `LogicalPlan` node using `f`.
-    ///
-    /// Returns the current node.
-    ///
-    /// Note this does not include expressions in child `LogicalPlan` nodes.
-    pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
-        self,
-        mut f: F,
-    ) -> Result<Transformed<Self>> {
-        Ok(match self {
-            LogicalPlan::Projection(Projection {
-                expr,
-                input,
-                schema,
-            }) => expr
-                .into_iter()
-                .map_until_stop_and_collect(f)?
-                .update_data(|expr| {
-                    LogicalPlan::Projection(Projection {
-                        expr,
-                        input,
-                        schema,
-                    })
-                }),
-            LogicalPlan::Values(Values { schema, values }) => values
-                .into_iter()
-                .map_until_stop_and_collect(|value| {
-                    value.into_iter().map_until_stop_and_collect(&mut f)
-                })?
-                .update_data(|values| LogicalPlan::Values(Values { schema, 
values })),
-            LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
-                .update_data(|predicate| {
-                    LogicalPlan::Filter(Filter { predicate, input })
-                }),
-            LogicalPlan::Repartition(Repartition {
-                input,
-                partitioning_scheme,
-            }) => match partitioning_scheme {
-                Partitioning::Hash(expr, usize) => expr
-                    .into_iter()
-                    .map_until_stop_and_collect(f)?
-                    .update_data(|expr| Partitioning::Hash(expr, usize)),
-                Partitioning::DistributeBy(expr) => expr
-                    .into_iter()
-                    .map_until_stop_and_collect(f)?
-                    .update_data(Partitioning::DistributeBy),
-                Partitioning::RoundRobinBatch(_) => 
Transformed::no(partitioning_scheme),
-            }
-            .update_data(|partitioning_scheme| {
-                LogicalPlan::Repartition(Repartition {
-                    input,
-                    partitioning_scheme,
-                })
-            }),
-            LogicalPlan::Window(Window {
-                input,
-                window_expr,
-                schema,
-            }) => window_expr
-                .into_iter()
-                .map_until_stop_and_collect(f)?
-                .update_data(|window_expr| {
-                    LogicalPlan::Window(Window {
-                        input,
-                        window_expr,
-                        schema,
-                    })
-                }),
-            LogicalPlan::Aggregate(Aggregate {
-                input,
-                group_expr,
-                aggr_expr,
-                schema,
-            }) => map_until_stop_and_collect!(
-                group_expr.into_iter().map_until_stop_and_collect(&mut f),
-                aggr_expr,
-                aggr_expr.into_iter().map_until_stop_and_collect(&mut f)
-            )?
-            .update_data(|(group_expr, aggr_expr)| {
-                LogicalPlan::Aggregate(Aggregate {
-                    input,
-                    group_expr,
-                    aggr_expr,
-                    schema,
-                })
-            }),
-
-            // There are two part of expression for join, equijoin(on) and 
non-equijoin(filter).
-            // 1. the first part is `on.len()` equijoin expressions, and the 
struct of each expr is `left-on = right-on`.
-            // 2. the second part is non-equijoin(filter).
-            LogicalPlan::Join(Join {
-                left,
-                right,
-                on,
-                filter,
-                join_type,
-                join_constraint,
-                schema,
-                null_equals_null,
-            }) => map_until_stop_and_collect!(
-                on.into_iter().map_until_stop_and_collect(
-                    |on| map_until_stop_and_collect!(f(on.0), on.1, f(on.1))
-                ),
-                filter,
-                filter.map_or(Ok::<_, DataFusionError>(Transformed::no(None)), 
|e| {
-                    Ok(f(e)?.update_data(Some))
-                })
-            )?
-            .update_data(|(on, filter)| {
-                LogicalPlan::Join(Join {
-                    left,
-                    right,
-                    on,
-                    filter,
-                    join_type,
-                    join_constraint,
-                    schema,
-                    null_equals_null,
-                })
-            }),
-            LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
-                .into_iter()
-                .map_until_stop_and_collect(f)?
-                .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, 
fetch })),
-            LogicalPlan::Extension(Extension { node }) => {
-                // would be nice to avoid this copy -- maybe can
-                // update extension to just observer Exprs
-                node.expressions()
-                    .into_iter()
-                    .map_until_stop_and_collect(f)?
-                    .update_data(|exprs| {
-                        LogicalPlan::Extension(Extension {
-                            node: UserDefinedLogicalNode::from_template(
-                                node.as_ref(),
-                                exprs.as_slice(),
-                                node.inputs()
-                                    .into_iter()
-                                    .cloned()
-                                    .collect::<Vec<_>>()
-                                    .as_slice(),
-                            ),
-                        })
-                    })
-            }
-            LogicalPlan::TableScan(TableScan {
-                table_name,
-                source,
-                projection,
-                projected_schema,
-                filters,
-                fetch,
-            }) => filters
-                .into_iter()
-                .map_until_stop_and_collect(f)?
-                .update_data(|filters| {
-                    LogicalPlan::TableScan(TableScan {
-                        table_name,
-                        source,
-                        projection,
-                        projected_schema,
-                        filters,
-                        fetch,
-                    })
-                }),
-            LogicalPlan::Unnest(Unnest {
-                input,
-                column,
-                schema,
-                options,
-            }) => f(Expr::Column(column))?.map_data(|column| match column {
-                Expr::Column(column) => Ok(LogicalPlan::Unnest(Unnest {
-                    input,
-                    column,
-                    schema,
-                    options,
-                })),
-                _ => internal_err!("Transformation should return Column"),
-            })?,
-            LogicalPlan::Distinct(Distinct::On(DistinctOn {
-                on_expr,
-                select_expr,
-                sort_expr,
-                input,
-                schema,
-            })) => map_until_stop_and_collect!(
-                on_expr.into_iter().map_until_stop_and_collect(&mut f),
-                select_expr,
-                select_expr.into_iter().map_until_stop_and_collect(&mut f),
-                sort_expr,
-                transform_option_vec(sort_expr, &mut f)
-            )?
-            .update_data(|(on_expr, select_expr, sort_expr)| {
-                LogicalPlan::Distinct(Distinct::On(DistinctOn {
-                    on_expr,
-                    select_expr,
-                    sort_expr,
-                    input,
-                    schema,
-                }))
-            }),
-            // plans without expressions
-            LogicalPlan::EmptyRelation(_)
-            | LogicalPlan::RecursiveQuery(_)
-            | LogicalPlan::Subquery(_)
-            | LogicalPlan::SubqueryAlias(_)
-            | LogicalPlan::Limit(_)
-            | LogicalPlan::Statement(_)
-            | LogicalPlan::CrossJoin(_)
-            | LogicalPlan::Analyze(_)
-            | LogicalPlan::Explain(_)
-            | LogicalPlan::Union(_)
-            | LogicalPlan::Distinct(Distinct::All(_))
-            | LogicalPlan::Dml(_)
-            | LogicalPlan::Ddl(_)
-            | LogicalPlan::Copy(_)
-            | LogicalPlan::DescribeTable(_)
-            | LogicalPlan::Prepare(_) => Transformed::no(self),
-        })
-    }
-
     /// Returns all inputs / children of this `LogicalPlan` node.
     ///
     /// Note does not include inputs to inputs, or subqueries.
@@ -1354,192 +1041,7 @@ impl LogicalPlan {
     }
 }
 
-/// This macro is used to determine continuation during combined transforming
-/// traversals.
-macro_rules! handle_transform_recursion {
-    ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
-        $F_DOWN?
-            .transform_children(|n| n.map_subqueries($F_CHILD))?
-            .transform_sibling(|n| n.map_children($F_CHILD))?
-            .transform_parent($F_UP)
-    }};
-}
-
-macro_rules! handle_transform_recursion_down {
-    ($F_DOWN:expr, $F_CHILD:expr) => {{
-        $F_DOWN?
-            .transform_children(|n| n.map_subqueries($F_CHILD))?
-            .transform_sibling(|n| n.map_children($F_CHILD))
-    }};
-}
-
-macro_rules! handle_transform_recursion_up {
-    ($SELF:expr, $F_CHILD:expr, $F_UP:expr) => {{
-        $SELF
-            .map_subqueries($F_CHILD)?
-            .transform_sibling(|n| n.map_children($F_CHILD))?
-            .transform_parent(|n| $F_UP(n))
-    }};
-}
-
 impl LogicalPlan {
-    /// Visits a plan similarly to [`Self::visit`], but including embedded 
subqueries.
-    pub fn visit_with_subqueries<V: TreeNodeVisitor<Node = Self>>(
-        &self,
-        visitor: &mut V,
-    ) -> Result<TreeNodeRecursion> {
-        visitor
-            .f_down(self)?
-            .visit_children(|| {
-                self.apply_subqueries(|c| c.visit_with_subqueries(visitor))
-            })?
-            .visit_sibling(|| self.apply_children(|c| 
c.visit_with_subqueries(visitor)))?
-            .visit_parent(|| visitor.f_up(self))
-    }
-
-    /// Rewrites a plan similarly t [`Self::visit`], but including embedded 
subqueries.
-    pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
-        self,
-        rewriter: &mut R,
-    ) -> Result<Transformed<Self>> {
-        handle_transform_recursion!(
-            rewriter.f_down(self),
-            |c| c.rewrite_with_subqueries(rewriter),
-            |n| rewriter.f_up(n)
-        )
-    }
-
-    /// Calls `f` recursively on all children of the  `LogicalPlan` node.
-    ///
-    /// Unlike [`Self::apply`], this method *does* includes `LogicalPlan`s that
-    /// are referenced in `Expr`s
-    pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
-        &self,
-        f: &mut F,
-    ) -> Result<TreeNodeRecursion> {
-        f(self)?
-            .visit_children(|| self.apply_subqueries(|c| 
c.apply_with_subqueries(f)))?
-            .visit_sibling(|| self.apply_children(|c| 
c.apply_with_subqueries(f)))
-    }
-
-    pub fn transform_with_subqueries<F: Fn(Self) -> Result<Transformed<Self>>>(
-        self,
-        f: &F,
-    ) -> Result<Transformed<Self>> {
-        self.transform_up_with_subqueries(f)
-    }
-
-    pub fn transform_down_with_subqueries<F: Fn(Self) -> 
Result<Transformed<Self>>>(
-        self,
-        f: &F,
-    ) -> Result<Transformed<Self>> {
-        handle_transform_recursion_down!(f(self), |c| 
c.transform_down_with_subqueries(f))
-    }
-
-    pub fn transform_down_mut_with_subqueries<
-        F: FnMut(Self) -> Result<Transformed<Self>>,
-    >(
-        self,
-        f: &mut F,
-    ) -> Result<Transformed<Self>> {
-        handle_transform_recursion_down!(f(self), |c| c
-            .transform_down_mut_with_subqueries(f))
-    }
-
-    pub fn transform_up_with_subqueries<F: Fn(Self) -> 
Result<Transformed<Self>>>(
-        self,
-        f: &F,
-    ) -> Result<Transformed<Self>> {
-        handle_transform_recursion_up!(self, |c| 
c.transform_up_with_subqueries(f), f)
-    }
-
-    pub fn transform_up_mut_with_subqueries<
-        F: FnMut(Self) -> Result<Transformed<Self>>,
-    >(
-        self,
-        f: &mut F,
-    ) -> Result<Transformed<Self>> {
-        handle_transform_recursion_up!(self, |c| 
c.transform_up_mut_with_subqueries(f), f)
-    }
-
-    pub fn transform_down_up_with_subqueries<
-        FD: FnMut(Self) -> Result<Transformed<Self>>,
-        FU: FnMut(Self) -> Result<Transformed<Self>>,
-    >(
-        self,
-        f_down: &mut FD,
-        f_up: &mut FU,
-    ) -> Result<Transformed<Self>> {
-        handle_transform_recursion!(
-            f_down(self),
-            |c| c.transform_down_up_with_subqueries(f_down, f_up),
-            f_up
-        )
-    }
-
-    /// Calls `f` on all subqueries referenced in expressions of the current
-    /// `LogicalPlan` node.
-    pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
-        &self,
-        mut f: F,
-    ) -> Result<TreeNodeRecursion> {
-        self.apply_expressions(|expr| {
-            expr.apply(&mut |expr| match expr {
-                Expr::Exists(Exists { subquery, .. })
-                | Expr::InSubquery(InSubquery { subquery, .. })
-                | Expr::ScalarSubquery(subquery) => {
-                    // use a synthetic plan so the collector sees a
-                    // LogicalPlan::Subquery (even though it is
-                    // actually a Subquery alias)
-                    f(&LogicalPlan::Subquery(subquery.clone()))
-                }
-                _ => Ok(TreeNodeRecursion::Continue),
-            })
-        })
-    }
-
-    /// Rewrites all subquery `LogicalPlan` in the current `LogicalPlan` node
-    /// using `f`.
-    ///
-    /// Returns the current node.
-    pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
-        self,
-        mut f: F,
-    ) -> Result<Transformed<Self>> {
-        self.map_expressions(|expr| {
-            expr.transform_down_mut(&mut |expr| match expr {
-                Expr::Exists(Exists { subquery, negated }) => {
-                    f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
-                        LogicalPlan::Subquery(subquery) => {
-                            Ok(Expr::Exists(Exists { subquery, negated }))
-                        }
-                        _ => internal_err!("Transformation should return 
Subquery"),
-                    })
-                }
-                Expr::InSubquery(InSubquery {
-                    expr,
-                    subquery,
-                    negated,
-                }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s 
{
-                    LogicalPlan::Subquery(subquery) => 
Ok(Expr::InSubquery(InSubquery {
-                        expr,
-                        subquery,
-                        negated,
-                    })),
-                    _ => internal_err!("Transformation should return 
Subquery"),
-                }),
-                Expr::ScalarSubquery(subquery) => 
f(LogicalPlan::Subquery(subquery))?
-                    .map_data(|s| match s {
-                        LogicalPlan::Subquery(subquery) => {
-                            Ok(Expr::ScalarSubquery(subquery))
-                        }
-                        _ => internal_err!("Transformation should return 
Subquery"),
-                    }),
-                _ => Ok(Transformed::no(expr)),
-            })
-        })
-    }
-
     /// Return a `LogicalPlan` with all placeholders (e.g $1 $2,
     /// ...) replaced with corresponding values provided in
     /// `params_values`
@@ -1623,10 +1125,11 @@ impl LogicalPlan {
         })
         .data()
     }
-}
 
-// Various implementations for printing out LogicalPlans
-impl LogicalPlan {
+    // ------------
+    // Various implementations for printing out LogicalPlans
+    // ------------
+
     /// Return a `format`able structure that produces a single line
     /// per node.
     ///
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs 
b/datafusion/expr/src/logical_plan/tree_node.rs
index 415343f886..1eb9d50277 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -38,16 +38,22 @@
 //! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
 use crate::{
     dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, CrossJoin,
-    DdlStatement, Distinct, DistinctOn, DmlStatement, Explain, Extension, 
Filter, Join,
-    Limit, LogicalPlan, Prepare, Projection, RecursiveQuery, Repartition, 
Sort, Subquery,
-    SubqueryAlias, Union, Unnest, Window,
+    DdlStatement, Distinct, DistinctOn, DmlStatement, Explain, Expr, 
Extension, Filter,
+    Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, 
RecursiveQuery,
+    Repartition, Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest,
+    UserDefinedLogicalNode, Values, Window,
 };
 use std::sync::Arc;
 
+use crate::expr::{Exists, InSubquery};
+use crate::tree_node::transform_option_vec;
 use datafusion_common::tree_node::{
-    Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion,
+    Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion, 
TreeNodeRewriter,
+    TreeNodeVisitor,
+};
+use datafusion_common::{
+    internal_err, map_until_stop_and_collect, DataFusionError, Result,
 };
-use datafusion_common::{map_until_stop_and_collect, Result};
 
 impl TreeNode for LogicalPlan {
     fn apply_children<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
@@ -413,3 +419,498 @@ where
             })
         })
 }
+
+/// This macro is used to determine continuation during combined transforming
+/// traversals.
+macro_rules! handle_transform_recursion {
+    ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
+        $F_DOWN?
+            .transform_children(|n| n.map_subqueries($F_CHILD))?
+            .transform_sibling(|n| n.map_children($F_CHILD))?
+            .transform_parent($F_UP)
+    }};
+}
+
+macro_rules! handle_transform_recursion_down {
+    ($F_DOWN:expr, $F_CHILD:expr) => {{
+        $F_DOWN?
+            .transform_children(|n| n.map_subqueries($F_CHILD))?
+            .transform_sibling(|n| n.map_children($F_CHILD))
+    }};
+}
+
+macro_rules! handle_transform_recursion_up {
+    ($SELF:expr, $F_CHILD:expr, $F_UP:expr) => {{
+        $SELF
+            .map_subqueries($F_CHILD)?
+            .transform_sibling(|n| n.map_children($F_CHILD))?
+            .transform_parent(|n| $F_UP(n))
+    }};
+}
+
+impl LogicalPlan {
+    /// Calls `f` on all expressions in the current `LogicalPlan` node.
+    ///
+    /// Note this does not include expressions in child `LogicalPlan` nodes.
+    pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
+        &self,
+        mut f: F,
+    ) -> Result<TreeNodeRecursion> {
+        match self {
+            LogicalPlan::Projection(Projection { expr, .. }) => {
+                expr.iter().apply_until_stop(f)
+            }
+            LogicalPlan::Values(Values { values, .. }) => values
+                .iter()
+                .apply_until_stop(|value| value.iter().apply_until_stop(&mut 
f)),
+            LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
+            LogicalPlan::Repartition(Repartition {
+                partitioning_scheme,
+                ..
+            }) => match partitioning_scheme {
+                Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) 
=> {
+                    expr.iter().apply_until_stop(f)
+                }
+                Partitioning::RoundRobinBatch(_) => 
Ok(TreeNodeRecursion::Continue),
+            },
+            LogicalPlan::Window(Window { window_expr, .. }) => {
+                window_expr.iter().apply_until_stop(f)
+            }
+            LogicalPlan::Aggregate(Aggregate {
+                group_expr,
+                aggr_expr,
+                ..
+            }) => group_expr
+                .iter()
+                .chain(aggr_expr.iter())
+                .apply_until_stop(f),
+            // There are two part of expression for join, equijoin(on) and 
non-equijoin(filter).
+            // 1. the first part is `on.len()` equijoin expressions, and the 
struct of each expr is `left-on = right-on`.
+            // 2. the second part is non-equijoin(filter).
+            LogicalPlan::Join(Join { on, filter, .. }) => {
+                on.iter()
+                    // TODO: why we need to create an `Expr::eq`? Cloning 
`Expr` is costly...
+                    // it not ideal to create an expr here to analyze them, 
but could cache it on the Join itself
+                    .map(|(l, r)| Expr::eq(l.clone(), r.clone()))
+                    .apply_until_stop(|e| f(&e))?
+                    .visit_sibling(|| filter.iter().apply_until_stop(f))
+            }
+            LogicalPlan::Sort(Sort { expr, .. }) => 
expr.iter().apply_until_stop(f),
+            LogicalPlan::Extension(extension) => {
+                // would be nice to avoid this copy -- maybe can
+                // update extension to just observer Exprs
+                extension.node.expressions().iter().apply_until_stop(f)
+            }
+            LogicalPlan::TableScan(TableScan { filters, .. }) => {
+                filters.iter().apply_until_stop(f)
+            }
+            LogicalPlan::Unnest(Unnest { column, .. }) => {
+                f(&Expr::Column(column.clone()))
+            }
+            LogicalPlan::Distinct(Distinct::On(DistinctOn {
+                on_expr,
+                select_expr,
+                sort_expr,
+                ..
+            })) => on_expr
+                .iter()
+                .chain(select_expr.iter())
+                .chain(sort_expr.iter().flatten())
+                .apply_until_stop(f),
+            // plans without expressions
+            LogicalPlan::EmptyRelation(_)
+            | LogicalPlan::RecursiveQuery(_)
+            | LogicalPlan::Subquery(_)
+            | LogicalPlan::SubqueryAlias(_)
+            | LogicalPlan::Limit(_)
+            | LogicalPlan::Statement(_)
+            | LogicalPlan::CrossJoin(_)
+            | LogicalPlan::Analyze(_)
+            | LogicalPlan::Explain(_)
+            | LogicalPlan::Union(_)
+            | LogicalPlan::Distinct(Distinct::All(_))
+            | LogicalPlan::Dml(_)
+            | LogicalPlan::Ddl(_)
+            | LogicalPlan::Copy(_)
+            | LogicalPlan::DescribeTable(_)
+            | LogicalPlan::Prepare(_) => Ok(TreeNodeRecursion::Continue),
+        }
+    }
+
+    /// Rewrites all expressions in the current `LogicalPlan` node using `f`.
+    ///
+    /// Returns the current node.
+    ///
+    /// Note this does not include expressions in child `LogicalPlan` nodes.
+    pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
+        self,
+        mut f: F,
+    ) -> Result<Transformed<Self>> {
+        Ok(match self {
+            LogicalPlan::Projection(Projection {
+                expr,
+                input,
+                schema,
+            }) => expr
+                .into_iter()
+                .map_until_stop_and_collect(f)?
+                .update_data(|expr| {
+                    LogicalPlan::Projection(Projection {
+                        expr,
+                        input,
+                        schema,
+                    })
+                }),
+            LogicalPlan::Values(Values { schema, values }) => values
+                .into_iter()
+                .map_until_stop_and_collect(|value| {
+                    value.into_iter().map_until_stop_and_collect(&mut f)
+                })?
+                .update_data(|values| LogicalPlan::Values(Values { schema, 
values })),
+            LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
+                .update_data(|predicate| {
+                    LogicalPlan::Filter(Filter { predicate, input })
+                }),
+            LogicalPlan::Repartition(Repartition {
+                input,
+                partitioning_scheme,
+            }) => match partitioning_scheme {
+                Partitioning::Hash(expr, usize) => expr
+                    .into_iter()
+                    .map_until_stop_and_collect(f)?
+                    .update_data(|expr| Partitioning::Hash(expr, usize)),
+                Partitioning::DistributeBy(expr) => expr
+                    .into_iter()
+                    .map_until_stop_and_collect(f)?
+                    .update_data(Partitioning::DistributeBy),
+                Partitioning::RoundRobinBatch(_) => 
Transformed::no(partitioning_scheme),
+            }
+            .update_data(|partitioning_scheme| {
+                LogicalPlan::Repartition(Repartition {
+                    input,
+                    partitioning_scheme,
+                })
+            }),
+            LogicalPlan::Window(Window {
+                input,
+                window_expr,
+                schema,
+            }) => window_expr
+                .into_iter()
+                .map_until_stop_and_collect(f)?
+                .update_data(|window_expr| {
+                    LogicalPlan::Window(Window {
+                        input,
+                        window_expr,
+                        schema,
+                    })
+                }),
+            LogicalPlan::Aggregate(Aggregate {
+                input,
+                group_expr,
+                aggr_expr,
+                schema,
+            }) => map_until_stop_and_collect!(
+                group_expr.into_iter().map_until_stop_and_collect(&mut f),
+                aggr_expr,
+                aggr_expr.into_iter().map_until_stop_and_collect(&mut f)
+            )?
+            .update_data(|(group_expr, aggr_expr)| {
+                LogicalPlan::Aggregate(Aggregate {
+                    input,
+                    group_expr,
+                    aggr_expr,
+                    schema,
+                })
+            }),
+
+            // There are two part of expression for join, equijoin(on) and 
non-equijoin(filter).
+            // 1. the first part is `on.len()` equijoin expressions, and the 
struct of each expr is `left-on = right-on`.
+            // 2. the second part is non-equijoin(filter).
+            LogicalPlan::Join(Join {
+                left,
+                right,
+                on,
+                filter,
+                join_type,
+                join_constraint,
+                schema,
+                null_equals_null,
+            }) => map_until_stop_and_collect!(
+                on.into_iter().map_until_stop_and_collect(
+                    |on| map_until_stop_and_collect!(f(on.0), on.1, f(on.1))
+                ),
+                filter,
+                filter.map_or(Ok::<_, DataFusionError>(Transformed::no(None)), 
|e| {
+                    Ok(f(e)?.update_data(Some))
+                })
+            )?
+            .update_data(|(on, filter)| {
+                LogicalPlan::Join(Join {
+                    left,
+                    right,
+                    on,
+                    filter,
+                    join_type,
+                    join_constraint,
+                    schema,
+                    null_equals_null,
+                })
+            }),
+            LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
+                .into_iter()
+                .map_until_stop_and_collect(f)?
+                .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, 
fetch })),
+            LogicalPlan::Extension(Extension { node }) => {
+                // would be nice to avoid this copy -- maybe can
+                // update extension to just observer Exprs
+                node.expressions()
+                    .into_iter()
+                    .map_until_stop_and_collect(f)?
+                    .update_data(|exprs| {
+                        LogicalPlan::Extension(Extension {
+                            node: UserDefinedLogicalNode::from_template(
+                                node.as_ref(),
+                                exprs.as_slice(),
+                                node.inputs()
+                                    .into_iter()
+                                    .cloned()
+                                    .collect::<Vec<_>>()
+                                    .as_slice(),
+                            ),
+                        })
+                    })
+            }
+            LogicalPlan::TableScan(TableScan {
+                table_name,
+                source,
+                projection,
+                projected_schema,
+                filters,
+                fetch,
+            }) => filters
+                .into_iter()
+                .map_until_stop_and_collect(f)?
+                .update_data(|filters| {
+                    LogicalPlan::TableScan(TableScan {
+                        table_name,
+                        source,
+                        projection,
+                        projected_schema,
+                        filters,
+                        fetch,
+                    })
+                }),
+            LogicalPlan::Unnest(Unnest {
+                input,
+                column,
+                schema,
+                options,
+            }) => f(Expr::Column(column))?.map_data(|column| match column {
+                Expr::Column(column) => Ok(LogicalPlan::Unnest(Unnest {
+                    input,
+                    column,
+                    schema,
+                    options,
+                })),
+                _ => internal_err!("Transformation should return Column"),
+            })?,
+            LogicalPlan::Distinct(Distinct::On(DistinctOn {
+                on_expr,
+                select_expr,
+                sort_expr,
+                input,
+                schema,
+            })) => map_until_stop_and_collect!(
+                on_expr.into_iter().map_until_stop_and_collect(&mut f),
+                select_expr,
+                select_expr.into_iter().map_until_stop_and_collect(&mut f),
+                sort_expr,
+                transform_option_vec(sort_expr, &mut f)
+            )?
+            .update_data(|(on_expr, select_expr, sort_expr)| {
+                LogicalPlan::Distinct(Distinct::On(DistinctOn {
+                    on_expr,
+                    select_expr,
+                    sort_expr,
+                    input,
+                    schema,
+                }))
+            }),
+            // plans without expressions
+            LogicalPlan::EmptyRelation(_)
+            | LogicalPlan::RecursiveQuery(_)
+            | LogicalPlan::Subquery(_)
+            | LogicalPlan::SubqueryAlias(_)
+            | LogicalPlan::Limit(_)
+            | LogicalPlan::Statement(_)
+            | LogicalPlan::CrossJoin(_)
+            | LogicalPlan::Analyze(_)
+            | LogicalPlan::Explain(_)
+            | LogicalPlan::Union(_)
+            | LogicalPlan::Distinct(Distinct::All(_))
+            | LogicalPlan::Dml(_)
+            | LogicalPlan::Ddl(_)
+            | LogicalPlan::Copy(_)
+            | LogicalPlan::DescribeTable(_)
+            | LogicalPlan::Prepare(_) => Transformed::no(self),
+        })
+    }
+
+    /// Visits a plan similarly to [`Self::visit`], but including embedded 
subqueries.
+    pub fn visit_with_subqueries<V: TreeNodeVisitor<Node = Self>>(
+        &self,
+        visitor: &mut V,
+    ) -> Result<TreeNodeRecursion> {
+        visitor
+            .f_down(self)?
+            .visit_children(|| {
+                self.apply_subqueries(|c| c.visit_with_subqueries(visitor))
+            })?
+            .visit_sibling(|| self.apply_children(|c| 
c.visit_with_subqueries(visitor)))?
+            .visit_parent(|| visitor.f_up(self))
+    }
+
+    /// Rewrites a plan similarly t [`Self::visit`], but including embedded 
subqueries.
+    pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
+        self,
+        rewriter: &mut R,
+    ) -> Result<Transformed<Self>> {
+        handle_transform_recursion!(
+            rewriter.f_down(self),
+            |c| c.rewrite_with_subqueries(rewriter),
+            |n| rewriter.f_up(n)
+        )
+    }
+
+    /// Calls `f` recursively on all children of the  `LogicalPlan` node.
+    ///
+    /// Unlike [`Self::apply`], this method *does* includes `LogicalPlan`s that
+    /// are referenced in `Expr`s
+    pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
+        &self,
+        f: &mut F,
+    ) -> Result<TreeNodeRecursion> {
+        f(self)?
+            .visit_children(|| self.apply_subqueries(|c| 
c.apply_with_subqueries(f)))?
+            .visit_sibling(|| self.apply_children(|c| 
c.apply_with_subqueries(f)))
+    }
+
+    pub fn transform_with_subqueries<F: Fn(Self) -> Result<Transformed<Self>>>(
+        self,
+        f: &F,
+    ) -> Result<Transformed<Self>> {
+        self.transform_up_with_subqueries(f)
+    }
+
+    pub fn transform_down_with_subqueries<F: Fn(Self) -> 
Result<Transformed<Self>>>(
+        self,
+        f: &F,
+    ) -> Result<Transformed<Self>> {
+        handle_transform_recursion_down!(f(self), |c| 
c.transform_down_with_subqueries(f))
+    }
+
+    pub fn transform_down_mut_with_subqueries<
+        F: FnMut(Self) -> Result<Transformed<Self>>,
+    >(
+        self,
+        f: &mut F,
+    ) -> Result<Transformed<Self>> {
+        handle_transform_recursion_down!(f(self), |c| c
+            .transform_down_mut_with_subqueries(f))
+    }
+
+    pub fn transform_up_with_subqueries<F: Fn(Self) -> 
Result<Transformed<Self>>>(
+        self,
+        f: &F,
+    ) -> Result<Transformed<Self>> {
+        handle_transform_recursion_up!(self, |c| 
c.transform_up_with_subqueries(f), f)
+    }
+
+    pub fn transform_up_mut_with_subqueries<
+        F: FnMut(Self) -> Result<Transformed<Self>>,
+    >(
+        self,
+        f: &mut F,
+    ) -> Result<Transformed<Self>> {
+        handle_transform_recursion_up!(self, |c| 
c.transform_up_mut_with_subqueries(f), f)
+    }
+
+    pub fn transform_down_up_with_subqueries<
+        FD: FnMut(Self) -> Result<Transformed<Self>>,
+        FU: FnMut(Self) -> Result<Transformed<Self>>,
+    >(
+        self,
+        f_down: &mut FD,
+        f_up: &mut FU,
+    ) -> Result<Transformed<Self>> {
+        handle_transform_recursion!(
+            f_down(self),
+            |c| c.transform_down_up_with_subqueries(f_down, f_up),
+            f_up
+        )
+    }
+
+    /// Calls `f` on all subqueries referenced in expressions of the current
+    /// `LogicalPlan` node.
+    pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
+        &self,
+        mut f: F,
+    ) -> Result<TreeNodeRecursion> {
+        self.apply_expressions(|expr| {
+            expr.apply(&mut |expr| match expr {
+                Expr::Exists(Exists { subquery, .. })
+                | Expr::InSubquery(InSubquery { subquery, .. })
+                | Expr::ScalarSubquery(subquery) => {
+                    // use a synthetic plan so the collector sees a
+                    // LogicalPlan::Subquery (even though it is
+                    // actually a Subquery alias)
+                    f(&LogicalPlan::Subquery(subquery.clone()))
+                }
+                _ => Ok(TreeNodeRecursion::Continue),
+            })
+        })
+    }
+
+    /// Rewrites all subquery `LogicalPlan` in the current `LogicalPlan` node
+    /// using `f`.
+    ///
+    /// Returns the current node.
+    pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
+        self,
+        mut f: F,
+    ) -> Result<Transformed<Self>> {
+        self.map_expressions(|expr| {
+            expr.transform_down_mut(&mut |expr| match expr {
+                Expr::Exists(Exists { subquery, negated }) => {
+                    f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
+                        LogicalPlan::Subquery(subquery) => {
+                            Ok(Expr::Exists(Exists { subquery, negated }))
+                        }
+                        _ => internal_err!("Transformation should return 
Subquery"),
+                    })
+                }
+                Expr::InSubquery(InSubquery {
+                    expr,
+                    subquery,
+                    negated,
+                }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s 
{
+                    LogicalPlan::Subquery(subquery) => 
Ok(Expr::InSubquery(InSubquery {
+                        expr,
+                        subquery,
+                        negated,
+                    })),
+                    _ => internal_err!("Transformation should return 
Subquery"),
+                }),
+                Expr::ScalarSubquery(subquery) => 
f(LogicalPlan::Subquery(subquery))?
+                    .map_data(|s| match s {
+                        LogicalPlan::Subquery(subquery) => {
+                            Ok(Expr::ScalarSubquery(subquery))
+                        }
+                        _ => internal_err!("Transformation should return 
Subquery"),
+                    }),
+                _ => Ok(Transformed::no(expr)),
+            })
+        })
+    }
+}


Reply via email to