This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b9759b9810 Consolidate LogicalPlan tree node walking/rewriting code
into one module (#10034)
b9759b9810 is described below
commit b9759b9810a05b7993c0357a5346197395cfd4cc
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Apr 10 17:31:41 2024 -0400
Consolidate LogicalPlan tree node walking/rewriting code into one module
(#10034)
---
datafusion/expr/src/logical_plan/plan.rs | 515 +-------------------------
datafusion/expr/src/logical_plan/tree_node.rs | 511 ++++++++++++++++++++++++-
2 files changed, 515 insertions(+), 511 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 7bad034a11..d16dfb1403 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -25,9 +25,7 @@ use std::sync::Arc;
use super::dml::CopyTo;
use super::DdlStatement;
use crate::builder::change_redundant_column;
-use crate::expr::{
- Alias, Exists, InSubquery, Placeholder, Sort as SortExpr, WindowFunction,
-};
+use crate::expr::{Alias, Placeholder, Sort as SortExpr, WindowFunction};
use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
@@ -44,19 +42,16 @@ use crate::{
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::tree_node::{
- Transformed, TransformedResult, TreeNode, TreeNodeIterator,
TreeNodeRecursion,
- TreeNodeRewriter, TreeNodeVisitor,
+ Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{
- aggregate_functional_dependencies, internal_err,
map_until_stop_and_collect,
- plan_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError,
Dependency,
- FunctionalDependence, FunctionalDependencies, ParamValues, Result,
TableReference,
- UnnestOptions,
+ aggregate_functional_dependencies, internal_err, plan_err, Column,
Constraints,
+ DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
+ FunctionalDependencies, ParamValues, Result, TableReference, UnnestOptions,
};
// backwards compatibility
use crate::display::PgJsonVisitor;
-use crate::tree_node::transform_option_vec;
pub use datafusion_common::display::{PlanType, StringifiedPlan,
ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};
@@ -315,314 +310,6 @@ impl LogicalPlan {
err
}
- /// Calls `f` on all expressions in the current `LogicalPlan` node.
- ///
- /// Note this does not include expressions in child `LogicalPlan` nodes.
- pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
- &self,
- mut f: F,
- ) -> Result<TreeNodeRecursion> {
- match self {
- LogicalPlan::Projection(Projection { expr, .. }) => {
- expr.iter().apply_until_stop(f)
- }
- LogicalPlan::Values(Values { values, .. }) => values
- .iter()
- .apply_until_stop(|value| value.iter().apply_until_stop(&mut
f)),
- LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
- LogicalPlan::Repartition(Repartition {
- partitioning_scheme,
- ..
- }) => match partitioning_scheme {
- Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr)
=> {
- expr.iter().apply_until_stop(f)
- }
- Partitioning::RoundRobinBatch(_) =>
Ok(TreeNodeRecursion::Continue),
- },
- LogicalPlan::Window(Window { window_expr, .. }) => {
- window_expr.iter().apply_until_stop(f)
- }
- LogicalPlan::Aggregate(Aggregate {
- group_expr,
- aggr_expr,
- ..
- }) => group_expr
- .iter()
- .chain(aggr_expr.iter())
- .apply_until_stop(f),
- // There are two part of expression for join, equijoin(on) and
non-equijoin(filter).
- // 1. the first part is `on.len()` equijoin expressions, and the
struct of each expr is `left-on = right-on`.
- // 2. the second part is non-equijoin(filter).
- LogicalPlan::Join(Join { on, filter, .. }) => {
- on.iter()
- // TODO: why we need to create an `Expr::eq`? Cloning
`Expr` is costly...
- // it not ideal to create an expr here to analyze them,
but could cache it on the Join itself
- .map(|(l, r)| Expr::eq(l.clone(), r.clone()))
- .apply_until_stop(|e| f(&e))?
- .visit_sibling(|| filter.iter().apply_until_stop(f))
- }
- LogicalPlan::Sort(Sort { expr, .. }) =>
expr.iter().apply_until_stop(f),
- LogicalPlan::Extension(extension) => {
- // would be nice to avoid this copy -- maybe can
- // update extension to just observer Exprs
- extension.node.expressions().iter().apply_until_stop(f)
- }
- LogicalPlan::TableScan(TableScan { filters, .. }) => {
- filters.iter().apply_until_stop(f)
- }
- LogicalPlan::Unnest(Unnest { column, .. }) => {
- f(&Expr::Column(column.clone()))
- }
- LogicalPlan::Distinct(Distinct::On(DistinctOn {
- on_expr,
- select_expr,
- sort_expr,
- ..
- })) => on_expr
- .iter()
- .chain(select_expr.iter())
- .chain(sort_expr.iter().flatten())
- .apply_until_stop(f),
- // plans without expressions
- LogicalPlan::EmptyRelation(_)
- | LogicalPlan::RecursiveQuery(_)
- | LogicalPlan::Subquery(_)
- | LogicalPlan::SubqueryAlias(_)
- | LogicalPlan::Limit(_)
- | LogicalPlan::Statement(_)
- | LogicalPlan::CrossJoin(_)
- | LogicalPlan::Analyze(_)
- | LogicalPlan::Explain(_)
- | LogicalPlan::Union(_)
- | LogicalPlan::Distinct(Distinct::All(_))
- | LogicalPlan::Dml(_)
- | LogicalPlan::Ddl(_)
- | LogicalPlan::Copy(_)
- | LogicalPlan::DescribeTable(_)
- | LogicalPlan::Prepare(_) => Ok(TreeNodeRecursion::Continue),
- }
- }
-
- /// Rewrites all expressions in the current `LogicalPlan` node using `f`.
- ///
- /// Returns the current node.
- ///
- /// Note this does not include expressions in child `LogicalPlan` nodes.
- pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
- self,
- mut f: F,
- ) -> Result<Transformed<Self>> {
- Ok(match self {
- LogicalPlan::Projection(Projection {
- expr,
- input,
- schema,
- }) => expr
- .into_iter()
- .map_until_stop_and_collect(f)?
- .update_data(|expr| {
- LogicalPlan::Projection(Projection {
- expr,
- input,
- schema,
- })
- }),
- LogicalPlan::Values(Values { schema, values }) => values
- .into_iter()
- .map_until_stop_and_collect(|value| {
- value.into_iter().map_until_stop_and_collect(&mut f)
- })?
- .update_data(|values| LogicalPlan::Values(Values { schema,
values })),
- LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
- .update_data(|predicate| {
- LogicalPlan::Filter(Filter { predicate, input })
- }),
- LogicalPlan::Repartition(Repartition {
- input,
- partitioning_scheme,
- }) => match partitioning_scheme {
- Partitioning::Hash(expr, usize) => expr
- .into_iter()
- .map_until_stop_and_collect(f)?
- .update_data(|expr| Partitioning::Hash(expr, usize)),
- Partitioning::DistributeBy(expr) => expr
- .into_iter()
- .map_until_stop_and_collect(f)?
- .update_data(Partitioning::DistributeBy),
- Partitioning::RoundRobinBatch(_) =>
Transformed::no(partitioning_scheme),
- }
- .update_data(|partitioning_scheme| {
- LogicalPlan::Repartition(Repartition {
- input,
- partitioning_scheme,
- })
- }),
- LogicalPlan::Window(Window {
- input,
- window_expr,
- schema,
- }) => window_expr
- .into_iter()
- .map_until_stop_and_collect(f)?
- .update_data(|window_expr| {
- LogicalPlan::Window(Window {
- input,
- window_expr,
- schema,
- })
- }),
- LogicalPlan::Aggregate(Aggregate {
- input,
- group_expr,
- aggr_expr,
- schema,
- }) => map_until_stop_and_collect!(
- group_expr.into_iter().map_until_stop_and_collect(&mut f),
- aggr_expr,
- aggr_expr.into_iter().map_until_stop_and_collect(&mut f)
- )?
- .update_data(|(group_expr, aggr_expr)| {
- LogicalPlan::Aggregate(Aggregate {
- input,
- group_expr,
- aggr_expr,
- schema,
- })
- }),
-
- // There are two part of expression for join, equijoin(on) and
non-equijoin(filter).
- // 1. the first part is `on.len()` equijoin expressions, and the
struct of each expr is `left-on = right-on`.
- // 2. the second part is non-equijoin(filter).
- LogicalPlan::Join(Join {
- left,
- right,
- on,
- filter,
- join_type,
- join_constraint,
- schema,
- null_equals_null,
- }) => map_until_stop_and_collect!(
- on.into_iter().map_until_stop_and_collect(
- |on| map_until_stop_and_collect!(f(on.0), on.1, f(on.1))
- ),
- filter,
- filter.map_or(Ok::<_, DataFusionError>(Transformed::no(None)),
|e| {
- Ok(f(e)?.update_data(Some))
- })
- )?
- .update_data(|(on, filter)| {
- LogicalPlan::Join(Join {
- left,
- right,
- on,
- filter,
- join_type,
- join_constraint,
- schema,
- null_equals_null,
- })
- }),
- LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
- .into_iter()
- .map_until_stop_and_collect(f)?
- .update_data(|expr| LogicalPlan::Sort(Sort { expr, input,
fetch })),
- LogicalPlan::Extension(Extension { node }) => {
- // would be nice to avoid this copy -- maybe can
- // update extension to just observer Exprs
- node.expressions()
- .into_iter()
- .map_until_stop_and_collect(f)?
- .update_data(|exprs| {
- LogicalPlan::Extension(Extension {
- node: UserDefinedLogicalNode::from_template(
- node.as_ref(),
- exprs.as_slice(),
- node.inputs()
- .into_iter()
- .cloned()
- .collect::<Vec<_>>()
- .as_slice(),
- ),
- })
- })
- }
- LogicalPlan::TableScan(TableScan {
- table_name,
- source,
- projection,
- projected_schema,
- filters,
- fetch,
- }) => filters
- .into_iter()
- .map_until_stop_and_collect(f)?
- .update_data(|filters| {
- LogicalPlan::TableScan(TableScan {
- table_name,
- source,
- projection,
- projected_schema,
- filters,
- fetch,
- })
- }),
- LogicalPlan::Unnest(Unnest {
- input,
- column,
- schema,
- options,
- }) => f(Expr::Column(column))?.map_data(|column| match column {
- Expr::Column(column) => Ok(LogicalPlan::Unnest(Unnest {
- input,
- column,
- schema,
- options,
- })),
- _ => internal_err!("Transformation should return Column"),
- })?,
- LogicalPlan::Distinct(Distinct::On(DistinctOn {
- on_expr,
- select_expr,
- sort_expr,
- input,
- schema,
- })) => map_until_stop_and_collect!(
- on_expr.into_iter().map_until_stop_and_collect(&mut f),
- select_expr,
- select_expr.into_iter().map_until_stop_and_collect(&mut f),
- sort_expr,
- transform_option_vec(sort_expr, &mut f)
- )?
- .update_data(|(on_expr, select_expr, sort_expr)| {
- LogicalPlan::Distinct(Distinct::On(DistinctOn {
- on_expr,
- select_expr,
- sort_expr,
- input,
- schema,
- }))
- }),
- // plans without expressions
- LogicalPlan::EmptyRelation(_)
- | LogicalPlan::RecursiveQuery(_)
- | LogicalPlan::Subquery(_)
- | LogicalPlan::SubqueryAlias(_)
- | LogicalPlan::Limit(_)
- | LogicalPlan::Statement(_)
- | LogicalPlan::CrossJoin(_)
- | LogicalPlan::Analyze(_)
- | LogicalPlan::Explain(_)
- | LogicalPlan::Union(_)
- | LogicalPlan::Distinct(Distinct::All(_))
- | LogicalPlan::Dml(_)
- | LogicalPlan::Ddl(_)
- | LogicalPlan::Copy(_)
- | LogicalPlan::DescribeTable(_)
- | LogicalPlan::Prepare(_) => Transformed::no(self),
- })
- }
-
/// Returns all inputs / children of this `LogicalPlan` node.
///
/// Note does not include inputs to inputs, or subqueries.
@@ -1354,192 +1041,7 @@ impl LogicalPlan {
}
}
-/// This macro is used to determine continuation during combined transforming
-/// traversals.
-macro_rules! handle_transform_recursion {
- ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
- $F_DOWN?
- .transform_children(|n| n.map_subqueries($F_CHILD))?
- .transform_sibling(|n| n.map_children($F_CHILD))?
- .transform_parent($F_UP)
- }};
-}
-
-macro_rules! handle_transform_recursion_down {
- ($F_DOWN:expr, $F_CHILD:expr) => {{
- $F_DOWN?
- .transform_children(|n| n.map_subqueries($F_CHILD))?
- .transform_sibling(|n| n.map_children($F_CHILD))
- }};
-}
-
-macro_rules! handle_transform_recursion_up {
- ($SELF:expr, $F_CHILD:expr, $F_UP:expr) => {{
- $SELF
- .map_subqueries($F_CHILD)?
- .transform_sibling(|n| n.map_children($F_CHILD))?
- .transform_parent(|n| $F_UP(n))
- }};
-}
-
impl LogicalPlan {
- /// Visits a plan similarly to [`Self::visit`], but including embedded
subqueries.
- pub fn visit_with_subqueries<V: TreeNodeVisitor<Node = Self>>(
- &self,
- visitor: &mut V,
- ) -> Result<TreeNodeRecursion> {
- visitor
- .f_down(self)?
- .visit_children(|| {
- self.apply_subqueries(|c| c.visit_with_subqueries(visitor))
- })?
- .visit_sibling(|| self.apply_children(|c|
c.visit_with_subqueries(visitor)))?
- .visit_parent(|| visitor.f_up(self))
- }
-
- /// Rewrites a plan similarly t [`Self::visit`], but including embedded
subqueries.
- pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
- self,
- rewriter: &mut R,
- ) -> Result<Transformed<Self>> {
- handle_transform_recursion!(
- rewriter.f_down(self),
- |c| c.rewrite_with_subqueries(rewriter),
- |n| rewriter.f_up(n)
- )
- }
-
- /// Calls `f` recursively on all children of the `LogicalPlan` node.
- ///
- /// Unlike [`Self::apply`], this method *does* includes `LogicalPlan`s that
- /// are referenced in `Expr`s
- pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
- &self,
- f: &mut F,
- ) -> Result<TreeNodeRecursion> {
- f(self)?
- .visit_children(|| self.apply_subqueries(|c|
c.apply_with_subqueries(f)))?
- .visit_sibling(|| self.apply_children(|c|
c.apply_with_subqueries(f)))
- }
-
- pub fn transform_with_subqueries<F: Fn(Self) -> Result<Transformed<Self>>>(
- self,
- f: &F,
- ) -> Result<Transformed<Self>> {
- self.transform_up_with_subqueries(f)
- }
-
- pub fn transform_down_with_subqueries<F: Fn(Self) ->
Result<Transformed<Self>>>(
- self,
- f: &F,
- ) -> Result<Transformed<Self>> {
- handle_transform_recursion_down!(f(self), |c|
c.transform_down_with_subqueries(f))
- }
-
- pub fn transform_down_mut_with_subqueries<
- F: FnMut(Self) -> Result<Transformed<Self>>,
- >(
- self,
- f: &mut F,
- ) -> Result<Transformed<Self>> {
- handle_transform_recursion_down!(f(self), |c| c
- .transform_down_mut_with_subqueries(f))
- }
-
- pub fn transform_up_with_subqueries<F: Fn(Self) ->
Result<Transformed<Self>>>(
- self,
- f: &F,
- ) -> Result<Transformed<Self>> {
- handle_transform_recursion_up!(self, |c|
c.transform_up_with_subqueries(f), f)
- }
-
- pub fn transform_up_mut_with_subqueries<
- F: FnMut(Self) -> Result<Transformed<Self>>,
- >(
- self,
- f: &mut F,
- ) -> Result<Transformed<Self>> {
- handle_transform_recursion_up!(self, |c|
c.transform_up_mut_with_subqueries(f), f)
- }
-
- pub fn transform_down_up_with_subqueries<
- FD: FnMut(Self) -> Result<Transformed<Self>>,
- FU: FnMut(Self) -> Result<Transformed<Self>>,
- >(
- self,
- f_down: &mut FD,
- f_up: &mut FU,
- ) -> Result<Transformed<Self>> {
- handle_transform_recursion!(
- f_down(self),
- |c| c.transform_down_up_with_subqueries(f_down, f_up),
- f_up
- )
- }
-
- /// Calls `f` on all subqueries referenced in expressions of the current
- /// `LogicalPlan` node.
- pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
- &self,
- mut f: F,
- ) -> Result<TreeNodeRecursion> {
- self.apply_expressions(|expr| {
- expr.apply(&mut |expr| match expr {
- Expr::Exists(Exists { subquery, .. })
- | Expr::InSubquery(InSubquery { subquery, .. })
- | Expr::ScalarSubquery(subquery) => {
- // use a synthetic plan so the collector sees a
- // LogicalPlan::Subquery (even though it is
- // actually a Subquery alias)
- f(&LogicalPlan::Subquery(subquery.clone()))
- }
- _ => Ok(TreeNodeRecursion::Continue),
- })
- })
- }
-
- /// Rewrites all subquery `LogicalPlan` in the current `LogicalPlan` node
- /// using `f`.
- ///
- /// Returns the current node.
- pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
- self,
- mut f: F,
- ) -> Result<Transformed<Self>> {
- self.map_expressions(|expr| {
- expr.transform_down_mut(&mut |expr| match expr {
- Expr::Exists(Exists { subquery, negated }) => {
- f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
- LogicalPlan::Subquery(subquery) => {
- Ok(Expr::Exists(Exists { subquery, negated }))
- }
- _ => internal_err!("Transformation should return
Subquery"),
- })
- }
- Expr::InSubquery(InSubquery {
- expr,
- subquery,
- negated,
- }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s
{
- LogicalPlan::Subquery(subquery) =>
Ok(Expr::InSubquery(InSubquery {
- expr,
- subquery,
- negated,
- })),
- _ => internal_err!("Transformation should return
Subquery"),
- }),
- Expr::ScalarSubquery(subquery) =>
f(LogicalPlan::Subquery(subquery))?
- .map_data(|s| match s {
- LogicalPlan::Subquery(subquery) => {
- Ok(Expr::ScalarSubquery(subquery))
- }
- _ => internal_err!("Transformation should return
Subquery"),
- }),
- _ => Ok(Transformed::no(expr)),
- })
- })
- }
-
/// Return a `LogicalPlan` with all placeholders (e.g $1 $2,
/// ...) replaced with corresponding values provided in
/// `params_values`
@@ -1623,10 +1125,11 @@ impl LogicalPlan {
})
.data()
}
-}
-// Various implementations for printing out LogicalPlans
-impl LogicalPlan {
+ // ------------
+ // Various implementations for printing out LogicalPlans
+ // ------------
+
/// Return a `format`able structure that produces a single line
/// per node.
///
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs
b/datafusion/expr/src/logical_plan/tree_node.rs
index 415343f886..1eb9d50277 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -38,16 +38,22 @@
//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
use crate::{
dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, CrossJoin,
- DdlStatement, Distinct, DistinctOn, DmlStatement, Explain, Extension,
Filter, Join,
- Limit, LogicalPlan, Prepare, Projection, RecursiveQuery, Repartition,
Sort, Subquery,
- SubqueryAlias, Union, Unnest, Window,
+ DdlStatement, Distinct, DistinctOn, DmlStatement, Explain, Expr,
Extension, Filter,
+ Join, Limit, LogicalPlan, Partitioning, Prepare, Projection,
RecursiveQuery,
+ Repartition, Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest,
+ UserDefinedLogicalNode, Values, Window,
};
use std::sync::Arc;
+use crate::expr::{Exists, InSubquery};
+use crate::tree_node::transform_option_vec;
use datafusion_common::tree_node::{
- Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion,
+ Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion,
TreeNodeRewriter,
+ TreeNodeVisitor,
+};
+use datafusion_common::{
+ internal_err, map_until_stop_and_collect, DataFusionError, Result,
};
-use datafusion_common::{map_until_stop_and_collect, Result};
impl TreeNode for LogicalPlan {
fn apply_children<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
@@ -413,3 +419,498 @@ where
})
})
}
+
+/// This macro is used to determine continuation during combined transforming
+/// traversals.
+macro_rules! handle_transform_recursion {
+ ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
+ $F_DOWN?
+ .transform_children(|n| n.map_subqueries($F_CHILD))?
+ .transform_sibling(|n| n.map_children($F_CHILD))?
+ .transform_parent($F_UP)
+ }};
+}
+
+macro_rules! handle_transform_recursion_down {
+ ($F_DOWN:expr, $F_CHILD:expr) => {{
+ $F_DOWN?
+ .transform_children(|n| n.map_subqueries($F_CHILD))?
+ .transform_sibling(|n| n.map_children($F_CHILD))
+ }};
+}
+
+macro_rules! handle_transform_recursion_up {
+ ($SELF:expr, $F_CHILD:expr, $F_UP:expr) => {{
+ $SELF
+ .map_subqueries($F_CHILD)?
+ .transform_sibling(|n| n.map_children($F_CHILD))?
+ .transform_parent(|n| $F_UP(n))
+ }};
+}
+
+impl LogicalPlan {
+ /// Calls `f` on all expressions in the current `LogicalPlan` node.
+ ///
+ /// Note this does not include expressions in child `LogicalPlan` nodes.
+ pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
+ &self,
+ mut f: F,
+ ) -> Result<TreeNodeRecursion> {
+ match self {
+ LogicalPlan::Projection(Projection { expr, .. }) => {
+ expr.iter().apply_until_stop(f)
+ }
+ LogicalPlan::Values(Values { values, .. }) => values
+ .iter()
+ .apply_until_stop(|value| value.iter().apply_until_stop(&mut
f)),
+ LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
+ LogicalPlan::Repartition(Repartition {
+ partitioning_scheme,
+ ..
+ }) => match partitioning_scheme {
+ Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr)
=> {
+ expr.iter().apply_until_stop(f)
+ }
+ Partitioning::RoundRobinBatch(_) =>
Ok(TreeNodeRecursion::Continue),
+ },
+ LogicalPlan::Window(Window { window_expr, .. }) => {
+ window_expr.iter().apply_until_stop(f)
+ }
+ LogicalPlan::Aggregate(Aggregate {
+ group_expr,
+ aggr_expr,
+ ..
+ }) => group_expr
+ .iter()
+ .chain(aggr_expr.iter())
+ .apply_until_stop(f),
+ // There are two part of expression for join, equijoin(on) and
non-equijoin(filter).
+ // 1. the first part is `on.len()` equijoin expressions, and the
struct of each expr is `left-on = right-on`.
+ // 2. the second part is non-equijoin(filter).
+ LogicalPlan::Join(Join { on, filter, .. }) => {
+ on.iter()
+ // TODO: why we need to create an `Expr::eq`? Cloning
`Expr` is costly...
+ // it not ideal to create an expr here to analyze them,
but could cache it on the Join itself
+ .map(|(l, r)| Expr::eq(l.clone(), r.clone()))
+ .apply_until_stop(|e| f(&e))?
+ .visit_sibling(|| filter.iter().apply_until_stop(f))
+ }
+ LogicalPlan::Sort(Sort { expr, .. }) =>
expr.iter().apply_until_stop(f),
+ LogicalPlan::Extension(extension) => {
+ // would be nice to avoid this copy -- maybe can
+ // update extension to just observer Exprs
+ extension.node.expressions().iter().apply_until_stop(f)
+ }
+ LogicalPlan::TableScan(TableScan { filters, .. }) => {
+ filters.iter().apply_until_stop(f)
+ }
+ LogicalPlan::Unnest(Unnest { column, .. }) => {
+ f(&Expr::Column(column.clone()))
+ }
+ LogicalPlan::Distinct(Distinct::On(DistinctOn {
+ on_expr,
+ select_expr,
+ sort_expr,
+ ..
+ })) => on_expr
+ .iter()
+ .chain(select_expr.iter())
+ .chain(sort_expr.iter().flatten())
+ .apply_until_stop(f),
+ // plans without expressions
+ LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::RecursiveQuery(_)
+ | LogicalPlan::Subquery(_)
+ | LogicalPlan::SubqueryAlias(_)
+ | LogicalPlan::Limit(_)
+ | LogicalPlan::Statement(_)
+ | LogicalPlan::CrossJoin(_)
+ | LogicalPlan::Analyze(_)
+ | LogicalPlan::Explain(_)
+ | LogicalPlan::Union(_)
+ | LogicalPlan::Distinct(Distinct::All(_))
+ | LogicalPlan::Dml(_)
+ | LogicalPlan::Ddl(_)
+ | LogicalPlan::Copy(_)
+ | LogicalPlan::DescribeTable(_)
+ | LogicalPlan::Prepare(_) => Ok(TreeNodeRecursion::Continue),
+ }
+ }
+
+ /// Rewrites all expressions in the current `LogicalPlan` node using `f`.
+ ///
+ /// Returns the current node.
+ ///
+ /// Note this does not include expressions in child `LogicalPlan` nodes.
+ pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
+ self,
+ mut f: F,
+ ) -> Result<Transformed<Self>> {
+ Ok(match self {
+ LogicalPlan::Projection(Projection {
+ expr,
+ input,
+ schema,
+ }) => expr
+ .into_iter()
+ .map_until_stop_and_collect(f)?
+ .update_data(|expr| {
+ LogicalPlan::Projection(Projection {
+ expr,
+ input,
+ schema,
+ })
+ }),
+ LogicalPlan::Values(Values { schema, values }) => values
+ .into_iter()
+ .map_until_stop_and_collect(|value| {
+ value.into_iter().map_until_stop_and_collect(&mut f)
+ })?
+ .update_data(|values| LogicalPlan::Values(Values { schema,
values })),
+ LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
+ .update_data(|predicate| {
+ LogicalPlan::Filter(Filter { predicate, input })
+ }),
+ LogicalPlan::Repartition(Repartition {
+ input,
+ partitioning_scheme,
+ }) => match partitioning_scheme {
+ Partitioning::Hash(expr, usize) => expr
+ .into_iter()
+ .map_until_stop_and_collect(f)?
+ .update_data(|expr| Partitioning::Hash(expr, usize)),
+ Partitioning::DistributeBy(expr) => expr
+ .into_iter()
+ .map_until_stop_and_collect(f)?
+ .update_data(Partitioning::DistributeBy),
+ Partitioning::RoundRobinBatch(_) =>
Transformed::no(partitioning_scheme),
+ }
+ .update_data(|partitioning_scheme| {
+ LogicalPlan::Repartition(Repartition {
+ input,
+ partitioning_scheme,
+ })
+ }),
+ LogicalPlan::Window(Window {
+ input,
+ window_expr,
+ schema,
+ }) => window_expr
+ .into_iter()
+ .map_until_stop_and_collect(f)?
+ .update_data(|window_expr| {
+ LogicalPlan::Window(Window {
+ input,
+ window_expr,
+ schema,
+ })
+ }),
+ LogicalPlan::Aggregate(Aggregate {
+ input,
+ group_expr,
+ aggr_expr,
+ schema,
+ }) => map_until_stop_and_collect!(
+ group_expr.into_iter().map_until_stop_and_collect(&mut f),
+ aggr_expr,
+ aggr_expr.into_iter().map_until_stop_and_collect(&mut f)
+ )?
+ .update_data(|(group_expr, aggr_expr)| {
+ LogicalPlan::Aggregate(Aggregate {
+ input,
+ group_expr,
+ aggr_expr,
+ schema,
+ })
+ }),
+
+ // There are two part of expression for join, equijoin(on) and
non-equijoin(filter).
+ // 1. the first part is `on.len()` equijoin expressions, and the
struct of each expr is `left-on = right-on`.
+ // 2. the second part is non-equijoin(filter).
+ LogicalPlan::Join(Join {
+ left,
+ right,
+ on,
+ filter,
+ join_type,
+ join_constraint,
+ schema,
+ null_equals_null,
+ }) => map_until_stop_and_collect!(
+ on.into_iter().map_until_stop_and_collect(
+ |on| map_until_stop_and_collect!(f(on.0), on.1, f(on.1))
+ ),
+ filter,
+ filter.map_or(Ok::<_, DataFusionError>(Transformed::no(None)),
|e| {
+ Ok(f(e)?.update_data(Some))
+ })
+ )?
+ .update_data(|(on, filter)| {
+ LogicalPlan::Join(Join {
+ left,
+ right,
+ on,
+ filter,
+ join_type,
+ join_constraint,
+ schema,
+ null_equals_null,
+ })
+ }),
+ LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
+ .into_iter()
+ .map_until_stop_and_collect(f)?
+ .update_data(|expr| LogicalPlan::Sort(Sort { expr, input,
fetch })),
+ LogicalPlan::Extension(Extension { node }) => {
+ // would be nice to avoid this copy -- maybe can
+ // update extension to just observer Exprs
+ node.expressions()
+ .into_iter()
+ .map_until_stop_and_collect(f)?
+ .update_data(|exprs| {
+ LogicalPlan::Extension(Extension {
+ node: UserDefinedLogicalNode::from_template(
+ node.as_ref(),
+ exprs.as_slice(),
+ node.inputs()
+ .into_iter()
+ .cloned()
+ .collect::<Vec<_>>()
+ .as_slice(),
+ ),
+ })
+ })
+ }
+ LogicalPlan::TableScan(TableScan {
+ table_name,
+ source,
+ projection,
+ projected_schema,
+ filters,
+ fetch,
+ }) => filters
+ .into_iter()
+ .map_until_stop_and_collect(f)?
+ .update_data(|filters| {
+ LogicalPlan::TableScan(TableScan {
+ table_name,
+ source,
+ projection,
+ projected_schema,
+ filters,
+ fetch,
+ })
+ }),
+ LogicalPlan::Unnest(Unnest {
+ input,
+ column,
+ schema,
+ options,
+ }) => f(Expr::Column(column))?.map_data(|column| match column {
+ Expr::Column(column) => Ok(LogicalPlan::Unnest(Unnest {
+ input,
+ column,
+ schema,
+ options,
+ })),
+ _ => internal_err!("Transformation should return Column"),
+ })?,
+ LogicalPlan::Distinct(Distinct::On(DistinctOn {
+ on_expr,
+ select_expr,
+ sort_expr,
+ input,
+ schema,
+ })) => map_until_stop_and_collect!(
+ on_expr.into_iter().map_until_stop_and_collect(&mut f),
+ select_expr,
+ select_expr.into_iter().map_until_stop_and_collect(&mut f),
+ sort_expr,
+ transform_option_vec(sort_expr, &mut f)
+ )?
+ .update_data(|(on_expr, select_expr, sort_expr)| {
+ LogicalPlan::Distinct(Distinct::On(DistinctOn {
+ on_expr,
+ select_expr,
+ sort_expr,
+ input,
+ schema,
+ }))
+ }),
+ // plans without expressions
+ LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::RecursiveQuery(_)
+ | LogicalPlan::Subquery(_)
+ | LogicalPlan::SubqueryAlias(_)
+ | LogicalPlan::Limit(_)
+ | LogicalPlan::Statement(_)
+ | LogicalPlan::CrossJoin(_)
+ | LogicalPlan::Analyze(_)
+ | LogicalPlan::Explain(_)
+ | LogicalPlan::Union(_)
+ | LogicalPlan::Distinct(Distinct::All(_))
+ | LogicalPlan::Dml(_)
+ | LogicalPlan::Ddl(_)
+ | LogicalPlan::Copy(_)
+ | LogicalPlan::DescribeTable(_)
+ | LogicalPlan::Prepare(_) => Transformed::no(self),
+ })
+ }
+
+ /// Visits a plan similarly to [`Self::visit`], but including embedded
subqueries.
+ pub fn visit_with_subqueries<V: TreeNodeVisitor<Node = Self>>(
+ &self,
+ visitor: &mut V,
+ ) -> Result<TreeNodeRecursion> {
+ visitor
+ .f_down(self)?
+ .visit_children(|| {
+ self.apply_subqueries(|c| c.visit_with_subqueries(visitor))
+ })?
+ .visit_sibling(|| self.apply_children(|c|
c.visit_with_subqueries(visitor)))?
+ .visit_parent(|| visitor.f_up(self))
+ }
+
+ /// Rewrites a plan similarly t [`Self::visit`], but including embedded
subqueries.
+ pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
+ self,
+ rewriter: &mut R,
+ ) -> Result<Transformed<Self>> {
+ handle_transform_recursion!(
+ rewriter.f_down(self),
+ |c| c.rewrite_with_subqueries(rewriter),
+ |n| rewriter.f_up(n)
+ )
+ }
+
+ /// Calls `f` recursively on all children of the `LogicalPlan` node.
+ ///
+ /// Unlike [`Self::apply`], this method *does* includes `LogicalPlan`s that
+ /// are referenced in `Expr`s
+ pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
+ &self,
+ f: &mut F,
+ ) -> Result<TreeNodeRecursion> {
+ f(self)?
+ .visit_children(|| self.apply_subqueries(|c|
c.apply_with_subqueries(f)))?
+ .visit_sibling(|| self.apply_children(|c|
c.apply_with_subqueries(f)))
+ }
+
+ pub fn transform_with_subqueries<F: Fn(Self) -> Result<Transformed<Self>>>(
+ self,
+ f: &F,
+ ) -> Result<Transformed<Self>> {
+ self.transform_up_with_subqueries(f)
+ }
+
+ pub fn transform_down_with_subqueries<F: Fn(Self) ->
Result<Transformed<Self>>>(
+ self,
+ f: &F,
+ ) -> Result<Transformed<Self>> {
+ handle_transform_recursion_down!(f(self), |c|
c.transform_down_with_subqueries(f))
+ }
+
+ pub fn transform_down_mut_with_subqueries<
+ F: FnMut(Self) -> Result<Transformed<Self>>,
+ >(
+ self,
+ f: &mut F,
+ ) -> Result<Transformed<Self>> {
+ handle_transform_recursion_down!(f(self), |c| c
+ .transform_down_mut_with_subqueries(f))
+ }
+
+ pub fn transform_up_with_subqueries<F: Fn(Self) ->
Result<Transformed<Self>>>(
+ self,
+ f: &F,
+ ) -> Result<Transformed<Self>> {
+ handle_transform_recursion_up!(self, |c|
c.transform_up_with_subqueries(f), f)
+ }
+
+ pub fn transform_up_mut_with_subqueries<
+ F: FnMut(Self) -> Result<Transformed<Self>>,
+ >(
+ self,
+ f: &mut F,
+ ) -> Result<Transformed<Self>> {
+ handle_transform_recursion_up!(self, |c|
c.transform_up_mut_with_subqueries(f), f)
+ }
+
+ pub fn transform_down_up_with_subqueries<
+ FD: FnMut(Self) -> Result<Transformed<Self>>,
+ FU: FnMut(Self) -> Result<Transformed<Self>>,
+ >(
+ self,
+ f_down: &mut FD,
+ f_up: &mut FU,
+ ) -> Result<Transformed<Self>> {
+ handle_transform_recursion!(
+ f_down(self),
+ |c| c.transform_down_up_with_subqueries(f_down, f_up),
+ f_up
+ )
+ }
+
+ /// Calls `f` on all subqueries referenced in expressions of the current
+ /// `LogicalPlan` node.
+ pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
+ &self,
+ mut f: F,
+ ) -> Result<TreeNodeRecursion> {
+ self.apply_expressions(|expr| {
+ expr.apply(&mut |expr| match expr {
+ Expr::Exists(Exists { subquery, .. })
+ | Expr::InSubquery(InSubquery { subquery, .. })
+ | Expr::ScalarSubquery(subquery) => {
+ // use a synthetic plan so the collector sees a
+ // LogicalPlan::Subquery (even though it is
+ // actually a Subquery alias)
+ f(&LogicalPlan::Subquery(subquery.clone()))
+ }
+ _ => Ok(TreeNodeRecursion::Continue),
+ })
+ })
+ }
+
+ /// Rewrites all subquery `LogicalPlan` in the current `LogicalPlan` node
+ /// using `f`.
+ ///
+ /// Returns the current node.
+ pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
+ self,
+ mut f: F,
+ ) -> Result<Transformed<Self>> {
+ self.map_expressions(|expr| {
+ expr.transform_down_mut(&mut |expr| match expr {
+ Expr::Exists(Exists { subquery, negated }) => {
+ f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
+ LogicalPlan::Subquery(subquery) => {
+ Ok(Expr::Exists(Exists { subquery, negated }))
+ }
+ _ => internal_err!("Transformation should return
Subquery"),
+ })
+ }
+ Expr::InSubquery(InSubquery {
+ expr,
+ subquery,
+ negated,
+ }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s
{
+ LogicalPlan::Subquery(subquery) =>
Ok(Expr::InSubquery(InSubquery {
+ expr,
+ subquery,
+ negated,
+ })),
+ _ => internal_err!("Transformation should return
Subquery"),
+ }),
+ Expr::ScalarSubquery(subquery) =>
f(LogicalPlan::Subquery(subquery))?
+ .map_data(|s| match s {
+ LogicalPlan::Subquery(subquery) => {
+ Ok(Expr::ScalarSubquery(subquery))
+ }
+ _ => internal_err!("Transformation should return
Subquery"),
+ }),
+ _ => Ok(Transformed::no(expr)),
+ })
+ })
+ }
+}