This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 18aef7c165 Make `LogicalPlan::with_new_exprs,` deprecate `from_plan` 
(#7396)
18aef7c165 is described below

commit 18aef7c1650c64f880fe745a04233125a7fc8770
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Sep 6 12:25:05 2023 -0400

    Make `LogicalPlan::with_new_exprs,` deprecate `from_plan` (#7396)
    
    * Make `LogicalPlan::with_new_exprs`, deprecate `from_plan`
    
    * Avoid cloning
    
    * fix docs
---
 datafusion/expr/src/logical_plan/plan.rs           | 387 ++++++++++++++++++++-
 datafusion/expr/src/utils.rs                       | 340 +-----------------
 datafusion/optimizer/src/analyzer/type_coercion.rs |   5 +-
 datafusion/optimizer/src/push_down_filter.rs       |   6 +-
 .../src/simplify_expressions/simplify_exprs.rs     |   4 +-
 .../optimizer/src/unwrap_cast_in_comparison.rs     |   7 +-
 6 files changed, 393 insertions(+), 356 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 083ee230c7..9f2d90accc 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -20,29 +20,32 @@
 use crate::dml::CopyOptions;
 use crate::expr::{Alias, Exists, InSubquery, Placeholder};
 use crate::expr_rewriter::create_col_from_scalar_expr;
-use crate::expr_vec_fmt;
 use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
 use crate::logical_plan::extension::UserDefinedLogicalNode;
 use crate::logical_plan::{DmlStatement, Statement};
 use crate::utils::{
-    enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs, 
from_plan,
+    enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
     grouping_set_expr_count, grouping_set_to_exprlist, inspect_expr_pre,
 };
 use crate::{
     build_join_schema, Expr, ExprSchemable, TableProviderFilterPushDown, 
TableSource,
 };
+use crate::{
+    expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, 
LogicalPlanBuilder, Operator,
+};
 
 use super::dml::CopyTo;
 use super::DdlStatement;
 
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion_common::tree_node::{
-    Transformed, TreeNode, TreeNodeVisitor, VisitRecursion,
+    RewriteRecursion, Transformed, TreeNode, TreeNodeRewriter, TreeNodeVisitor,
+    VisitRecursion,
 };
 use datafusion_common::{
-    aggregate_functional_dependencies, internal_err, plan_err, Column, 
DFField, DFSchema,
-    DFSchemaRef, DataFusionError, FunctionalDependencies, OwnedTableReference, 
Result,
-    ScalarValue, UnnestOptions,
+    aggregate_functional_dependencies, internal_err, plan_err, Column, 
Constraints,
+    DFField, DFSchema, DFSchemaRef, DataFusionError, FunctionalDependencies,
+    OwnedTableReference, Result, ScalarValue, UnnestOptions,
 };
 // backwards compatibility
 pub use datafusion_common::display::{PlanType, StringifiedPlan, 
ToStringifiedPlan};
@@ -513,6 +516,7 @@ impl LogicalPlan {
         }
     }
 
+    /// Returns a copy of this `LogicalPlan` with the new inputs
     pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> 
Result<LogicalPlan> {
         // with_new_inputs use original expression,
         // so we don't need to recompute Schema.
@@ -544,10 +548,377 @@ impl LogicalPlan {
                 aggr_expr.to_vec(),
                 schema.clone(),
             )?)),
-            _ => from_plan(self, &self.expressions(), inputs),
+            _ => self.with_new_exprs(self.expressions(), inputs),
         }
     }
 
+    /// Returns a new `LogicalPlan` based on `self` with inputs and
+    /// expressions replaced.
+    ///
+    /// The exprs correspond to the same order of expressions returned
+    /// by [`Self::expressions`]. This function is used by optimizers
+    /// to rewrite plans using the following pattern:
+    ///
+    /// ```text
+    /// let new_inputs = optimize_children(..., plan, props);
+    ///
+    /// // get the plans expressions to optimize
+    /// let exprs = plan.expressions();
+    ///
+    /// // potentially rewrite plan expressions
+    /// let rewritten_exprs = rewrite_exprs(exprs);
+    ///
+    /// // create new plan using rewritten_exprs in same position
+    /// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
+    /// ```
+    ///
+    /// Note: sometimes [`Self::with_new_exprs`] will use schema of
+    /// original plan, it will not change the scheam.  Such as
+    /// `Projection/Aggregate/Window`
+    pub fn with_new_exprs(
+        &self,
+        mut expr: Vec<Expr>,
+        inputs: &[LogicalPlan],
+    ) -> Result<LogicalPlan> {
+        match self {
+            LogicalPlan::Projection(Projection { schema, .. }) => {
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    expr,
+                    Arc::new(inputs[0].clone()),
+                    schema.clone(),
+                )?))
+            }
+            LogicalPlan::Dml(DmlStatement {
+                table_name,
+                table_schema,
+                op,
+                ..
+            }) => Ok(LogicalPlan::Dml(DmlStatement {
+                table_name: table_name.clone(),
+                table_schema: table_schema.clone(),
+                op: op.clone(),
+                input: Arc::new(inputs[0].clone()),
+            })),
+            LogicalPlan::Copy(CopyTo {
+                input: _,
+                output_url,
+                file_format,
+                copy_options,
+                single_file_output,
+            }) => Ok(LogicalPlan::Copy(CopyTo {
+                input: Arc::new(inputs[0].clone()),
+                output_url: output_url.clone(),
+                file_format: file_format.clone(),
+                single_file_output: *single_file_output,
+                copy_options: copy_options.clone(),
+            })),
+            LogicalPlan::Values(Values { schema, .. }) => {
+                Ok(LogicalPlan::Values(Values {
+                    schema: schema.clone(),
+                    values: expr
+                        .chunks_exact(schema.fields().len())
+                        .map(|s| s.to_vec())
+                        .collect::<Vec<_>>(),
+                }))
+            }
+            LogicalPlan::Filter { .. } => {
+                assert_eq!(1, expr.len());
+                let predicate = expr.pop().unwrap();
+
+                // filter predicates should not contain aliased expressions so 
we remove any aliases
+                // before this logic was added we would have aliases within 
filters such as for
+                // benchmark q6:
+                //
+                // lineitem.l_shipdate >= Date32(\"8766\")
+                // AND lineitem.l_shipdate < Date32(\"9131\")
+                // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS 
lineitem.l_discount >=
+                // Decimal128(Some(49999999999999),30,15)
+                // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS 
lineitem.l_discount <=
+                // Decimal128(Some(69999999999999),30,15)
+                // AND lineitem.l_quantity < Decimal128(Some(2400),15,2)
+
+                struct RemoveAliases {}
+
+                impl TreeNodeRewriter for RemoveAliases {
+                    type N = Expr;
+
+                    fn pre_visit(&mut self, expr: &Expr) -> 
Result<RewriteRecursion> {
+                        match expr {
+                            Expr::Exists { .. }
+                            | Expr::ScalarSubquery(_)
+                            | Expr::InSubquery(_) => {
+                                // subqueries could contain aliases so we 
don't recurse into those
+                                Ok(RewriteRecursion::Stop)
+                            }
+                            Expr::Alias(_) => Ok(RewriteRecursion::Mutate),
+                            _ => Ok(RewriteRecursion::Continue),
+                        }
+                    }
+
+                    fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+                        Ok(expr.unalias())
+                    }
+                }
+
+                let mut remove_aliases = RemoveAliases {};
+                let predicate = predicate.rewrite(&mut remove_aliases)?;
+
+                Ok(LogicalPlan::Filter(Filter::try_new(
+                    predicate,
+                    Arc::new(inputs[0].clone()),
+                )?))
+            }
+            LogicalPlan::Repartition(Repartition {
+                partitioning_scheme,
+                ..
+            }) => match partitioning_scheme {
+                Partitioning::RoundRobinBatch(n) => {
+                    Ok(LogicalPlan::Repartition(Repartition {
+                        partitioning_scheme: Partitioning::RoundRobinBatch(*n),
+                        input: Arc::new(inputs[0].clone()),
+                    }))
+                }
+                Partitioning::Hash(_, n) => 
Ok(LogicalPlan::Repartition(Repartition {
+                    partitioning_scheme: Partitioning::Hash(expr, *n),
+                    input: Arc::new(inputs[0].clone()),
+                })),
+                Partitioning::DistributeBy(_) => {
+                    Ok(LogicalPlan::Repartition(Repartition {
+                        partitioning_scheme: Partitioning::DistributeBy(expr),
+                        input: Arc::new(inputs[0].clone()),
+                    }))
+                }
+            },
+            LogicalPlan::Window(Window {
+                window_expr,
+                schema,
+                ..
+            }) => {
+                assert_eq!(window_expr.len(), expr.len());
+                Ok(LogicalPlan::Window(Window {
+                    input: Arc::new(inputs[0].clone()),
+                    window_expr: expr,
+                    schema: schema.clone(),
+                }))
+            }
+            LogicalPlan::Aggregate(Aggregate {
+                group_expr, schema, ..
+            }) => {
+                // group exprs are the first expressions
+                let agg_expr = expr.split_off(group_expr.len());
+
+                Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+                    Arc::new(inputs[0].clone()),
+                    expr,
+                    agg_expr,
+                    schema.clone(),
+                )?))
+            }
+            LogicalPlan::Sort(Sort { fetch, .. }) => Ok(LogicalPlan::Sort(Sort 
{
+                expr,
+                input: Arc::new(inputs[0].clone()),
+                fetch: *fetch,
+            })),
+            LogicalPlan::Join(Join {
+                join_type,
+                join_constraint,
+                on,
+                null_equals_null,
+                ..
+            }) => {
+                let schema =
+                    build_join_schema(inputs[0].schema(), inputs[1].schema(), 
join_type)?;
+
+                let equi_expr_count = on.len();
+                assert!(expr.len() >= equi_expr_count);
+
+                // Assume that the last expr, if any,
+                // is the filter_expr (non equality predicate from ON clause)
+                let filter_expr = if expr.len() > equi_expr_count {
+                    expr.pop()
+                } else {
+                    None
+                };
+
+                // The first part of expr is equi-exprs,
+                // and the struct of each equi-expr is like `left-expr = 
right-expr`.
+                assert_eq!(expr.len(), equi_expr_count);
+                let new_on:Vec<(Expr,Expr)> = expr.into_iter().map(|equi_expr| 
{
+                    // SimplifyExpression rule may add alias to the equi_expr.
+                    let unalias_expr = equi_expr.clone().unalias();
+                    if let Expr::BinaryExpr(BinaryExpr { left, op: 
Operator::Eq, right }) = unalias_expr {
+                        Ok((*left, *right))
+                    } else {
+                        internal_err!(
+                            "The front part expressions should be an binary 
equality expression, actual:{equi_expr}"
+                        )
+                    }
+                }).collect::<Result<Vec<(Expr, Expr)>>>()?;
+
+                Ok(LogicalPlan::Join(Join {
+                    left: Arc::new(inputs[0].clone()),
+                    right: Arc::new(inputs[1].clone()),
+                    join_type: *join_type,
+                    join_constraint: *join_constraint,
+                    on: new_on,
+                    filter: filter_expr,
+                    schema: DFSchemaRef::new(schema),
+                    null_equals_null: *null_equals_null,
+                }))
+            }
+            LogicalPlan::CrossJoin(_) => {
+                let left = inputs[0].clone();
+                let right = inputs[1].clone();
+                LogicalPlanBuilder::from(left).cross_join(right)?.build()
+            }
+            LogicalPlan::Subquery(Subquery {
+                outer_ref_columns, ..
+            }) => {
+                let subquery = 
LogicalPlanBuilder::from(inputs[0].clone()).build()?;
+                Ok(LogicalPlan::Subquery(Subquery {
+                    subquery: Arc::new(subquery),
+                    outer_ref_columns: outer_ref_columns.clone(),
+                }))
+            }
+            LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
+                Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
+                    inputs[0].clone(),
+                    alias.clone(),
+                )?))
+            }
+            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
+                Ok(LogicalPlan::Limit(Limit {
+                    skip: *skip,
+                    fetch: *fetch,
+                    input: Arc::new(inputs[0].clone()),
+                }))
+            }
+            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable 
{
+                name,
+                if_not_exists,
+                or_replace,
+                ..
+            })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
+                CreateMemoryTable {
+                    input: Arc::new(inputs[0].clone()),
+                    constraints: Constraints::empty(),
+                    name: name.clone(),
+                    if_not_exists: *if_not_exists,
+                    or_replace: *or_replace,
+                },
+            ))),
+            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
+                name,
+                or_replace,
+                definition,
+                ..
+            })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
+                input: Arc::new(inputs[0].clone()),
+                name: name.clone(),
+                or_replace: *or_replace,
+                definition: definition.clone(),
+            }))),
+            LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
+                node: e.node.from_template(&expr, inputs),
+            })),
+            LogicalPlan::Union(Union { schema, .. }) => 
Ok(LogicalPlan::Union(Union {
+                inputs: inputs.iter().cloned().map(Arc::new).collect(),
+                schema: schema.clone(),
+            })),
+            LogicalPlan::Distinct(Distinct { .. }) => {
+                Ok(LogicalPlan::Distinct(Distinct {
+                    input: Arc::new(inputs[0].clone()),
+                }))
+            }
+            LogicalPlan::Analyze(a) => {
+                assert!(expr.is_empty());
+                assert_eq!(inputs.len(), 1);
+                Ok(LogicalPlan::Analyze(Analyze {
+                    verbose: a.verbose,
+                    schema: a.schema.clone(),
+                    input: Arc::new(inputs[0].clone()),
+                }))
+            }
+            LogicalPlan::Explain(_) => {
+                // Explain should be handled specially in the optimizers;
+                // If this check cannot pass it means some optimizer pass is
+                // trying to optimize Explain directly
+                if expr.is_empty() {
+                    return plan_err!("Invalid EXPLAIN command. Expression is 
empty");
+                }
+
+                if inputs.is_empty() {
+                    return plan_err!("Invalid EXPLAIN command. Inputs are 
empty");
+                }
+
+                Ok(self.clone())
+            }
+            LogicalPlan::Prepare(Prepare {
+                name, data_types, ..
+            }) => Ok(LogicalPlan::Prepare(Prepare {
+                name: name.clone(),
+                data_types: data_types.clone(),
+                input: Arc::new(inputs[0].clone()),
+            })),
+            LogicalPlan::TableScan(ts) => {
+                assert!(inputs.is_empty(), "{self:?}  should have no inputs");
+                Ok(LogicalPlan::TableScan(TableScan {
+                    filters: expr,
+                    ..ts.clone()
+                }))
+            }
+            LogicalPlan::EmptyRelation(_)
+            | LogicalPlan::Ddl(_)
+            | LogicalPlan::Statement(_) => {
+                // All of these plan types have no inputs / exprs so should 
not be called
+                assert!(expr.is_empty(), "{self:?} should have no exprs");
+                assert!(inputs.is_empty(), "{self:?}  should have no inputs");
+                Ok(self.clone())
+            }
+            LogicalPlan::DescribeTable(_) => Ok(self.clone()),
+            LogicalPlan::Unnest(Unnest {
+                column,
+                schema,
+                options,
+                ..
+            }) => {
+                // Update schema with unnested column type.
+                let input = Arc::new(inputs[0].clone());
+                let nested_field = input.schema().field_from_column(column)?;
+                let unnested_field = schema.field_from_column(column)?;
+                let fields = input
+                    .schema()
+                    .fields()
+                    .iter()
+                    .map(|f| {
+                        if f == nested_field {
+                            unnested_field.clone()
+                        } else {
+                            f.clone()
+                        }
+                    })
+                    .collect::<Vec<_>>();
+
+                let schema = Arc::new(
+                    DFSchema::new_with_metadata(
+                        fields,
+                        input.schema().metadata().clone(),
+                    )?
+                    // We can use the existing functional dependencies as is:
+                    .with_functional_dependencies(
+                        input.schema().functional_dependencies().clone(),
+                    ),
+                );
+
+                Ok(LogicalPlan::Unnest(Unnest {
+                    input,
+                    column: column.clone(),
+                    schema,
+                    options: options.clone(),
+                }))
+            }
+        }
+    }
     /// Convert a prepared [`LogicalPlan`] into its inner logical plan
     /// with all params replaced with their corresponding values
     pub fn with_param_values(
@@ -751,7 +1122,7 @@ impl LogicalPlan {
             .map(|inp| inp.replace_params_with_values(param_values))
             .collect::<Result<Vec<_>>>()?;
 
-        from_plan(self, &new_exprs, &new_inputs_with_values)
+        self.with_new_exprs(new_exprs, &new_inputs_with_values)
     }
 
     /// Walk the logical plan, find any `PlaceHolder` tokens, and return a map 
of their IDs and DataTypes
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index be48418cb8..cfe751d096 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -17,32 +17,19 @@
 
 //! Expression utilities
 
-use crate::dml::CopyTo;
 use crate::expr::{Alias, Sort, WindowFunction};
-use crate::logical_plan::builder::build_join_schema;
-use crate::logical_plan::{
-    Aggregate, Analyze, Distinct, Extension, Filter, Join, Limit, 
Partitioning, Prepare,
-    Projection, Repartition, Sort as SortPlan, Subquery, SubqueryAlias, Union, 
Unnest,
-    Values, Window,
-};
+use crate::logical_plan::Aggregate;
 use crate::signature::{Signature, TypeSignature};
-use crate::{
-    BinaryExpr, Cast, CreateMemoryTable, CreateView, DdlStatement, 
DmlStatement, Expr,
-    ExprSchemable, GroupingSet, LogicalPlan, LogicalPlanBuilder, Operator, 
TableScan,
-    TryCast,
-};
+use crate::{Cast, Expr, ExprSchemable, GroupingSet, LogicalPlan, TryCast};
 use arrow::datatypes::{DataType, TimeUnit};
-use datafusion_common::tree_node::{
-    RewriteRecursion, TreeNode, TreeNodeRewriter, VisitRecursion,
-};
+use datafusion_common::tree_node::{TreeNode, VisitRecursion};
 use datafusion_common::{
-    internal_err, plan_err, Column, Constraints, DFField, DFSchema, 
DFSchemaRef,
-    DataFusionError, Result, ScalarValue, TableReference,
+    internal_err, plan_err, Column, DFField, DFSchema, DFSchemaRef, 
DataFusionError,
+    Result, ScalarValue, TableReference,
 };
 use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, 
WildcardAdditionalOptions};
 use std::cmp::Ordering;
 use std::collections::HashSet;
-use std::sync::Arc;
 
 ///  The value to which `COUNT(*)` is expanded to in
 ///  `COUNT(<constant>)` expressions
@@ -727,326 +714,13 @@ where
 ///
 /// Notice: sometimes [from_plan] will use schema of original plan, it don't 
change schema!
 /// Such as `Projection/Aggregate/Window`
+#[deprecated(since = "31.0.0", note = "use LogicalPlan::with_new_exprs 
instead")]
 pub fn from_plan(
     plan: &LogicalPlan,
     expr: &[Expr],
     inputs: &[LogicalPlan],
 ) -> Result<LogicalPlan> {
-    match plan {
-        LogicalPlan::Projection(Projection { schema, .. }) => {
-            Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
-                expr.to_vec(),
-                Arc::new(inputs[0].clone()),
-                schema.clone(),
-            )?))
-        }
-        LogicalPlan::Dml(DmlStatement {
-            table_name,
-            table_schema,
-            op,
-            ..
-        }) => Ok(LogicalPlan::Dml(DmlStatement {
-            table_name: table_name.clone(),
-            table_schema: table_schema.clone(),
-            op: op.clone(),
-            input: Arc::new(inputs[0].clone()),
-        })),
-        LogicalPlan::Copy(CopyTo {
-            input: _,
-            output_url,
-            file_format,
-            single_file_output,
-            copy_options,
-        }) => Ok(LogicalPlan::Copy(CopyTo {
-            input: Arc::new(inputs[0].clone()),
-            output_url: output_url.clone(),
-            file_format: file_format.clone(),
-            single_file_output: *single_file_output,
-            copy_options: copy_options.clone(),
-        })),
-        LogicalPlan::Values(Values { schema, .. }) => 
Ok(LogicalPlan::Values(Values {
-            schema: schema.clone(),
-            values: expr
-                .chunks_exact(schema.fields().len())
-                .map(|s| s.to_vec())
-                .collect::<Vec<_>>(),
-        })),
-        LogicalPlan::Filter { .. } => {
-            assert_eq!(1, expr.len());
-            let predicate = expr[0].clone();
-
-            // filter predicates should not contain aliased expressions so we 
remove any aliases
-            // before this logic was added we would have aliases within 
filters such as for
-            // benchmark q6:
-            //
-            // lineitem.l_shipdate >= Date32(\"8766\")
-            // AND lineitem.l_shipdate < Date32(\"9131\")
-            // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS 
lineitem.l_discount >=
-            // Decimal128(Some(49999999999999),30,15)
-            // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS 
lineitem.l_discount <=
-            // Decimal128(Some(69999999999999),30,15)
-            // AND lineitem.l_quantity < Decimal128(Some(2400),15,2)
-
-            struct RemoveAliases {}
-
-            impl TreeNodeRewriter for RemoveAliases {
-                type N = Expr;
-
-                fn pre_visit(&mut self, expr: &Expr) -> 
Result<RewriteRecursion> {
-                    match expr {
-                        Expr::Exists { .. }
-                        | Expr::ScalarSubquery(_)
-                        | Expr::InSubquery(_) => {
-                            // subqueries could contain aliases so we don't 
recurse into those
-                            Ok(RewriteRecursion::Stop)
-                        }
-                        Expr::Alias(_) => Ok(RewriteRecursion::Mutate),
-                        _ => Ok(RewriteRecursion::Continue),
-                    }
-                }
-
-                fn mutate(&mut self, expr: Expr) -> Result<Expr> {
-                    Ok(expr.unalias())
-                }
-            }
-
-            let mut remove_aliases = RemoveAliases {};
-            let predicate = predicate.rewrite(&mut remove_aliases)?;
-
-            Ok(LogicalPlan::Filter(Filter::try_new(
-                predicate,
-                Arc::new(inputs[0].clone()),
-            )?))
-        }
-        LogicalPlan::Repartition(Repartition {
-            partitioning_scheme,
-            ..
-        }) => match partitioning_scheme {
-            Partitioning::RoundRobinBatch(n) => {
-                Ok(LogicalPlan::Repartition(Repartition {
-                    partitioning_scheme: Partitioning::RoundRobinBatch(*n),
-                    input: Arc::new(inputs[0].clone()),
-                }))
-            }
-            Partitioning::Hash(_, n) => 
Ok(LogicalPlan::Repartition(Repartition {
-                partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n),
-                input: Arc::new(inputs[0].clone()),
-            })),
-            Partitioning::DistributeBy(_) => 
Ok(LogicalPlan::Repartition(Repartition {
-                partitioning_scheme: 
Partitioning::DistributeBy(expr.to_owned()),
-                input: Arc::new(inputs[0].clone()),
-            })),
-        },
-        LogicalPlan::Window(Window {
-            window_expr,
-            schema,
-            ..
-        }) => Ok(LogicalPlan::Window(Window {
-            input: Arc::new(inputs[0].clone()),
-            window_expr: expr[0..window_expr.len()].to_vec(),
-            schema: schema.clone(),
-        })),
-        LogicalPlan::Aggregate(Aggregate {
-            group_expr, schema, ..
-        }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
-            Arc::new(inputs[0].clone()),
-            expr[0..group_expr.len()].to_vec(),
-            expr[group_expr.len()..].to_vec(),
-            schema.clone(),
-        )?)),
-        LogicalPlan::Sort(SortPlan { fetch, .. }) => 
Ok(LogicalPlan::Sort(SortPlan {
-            expr: expr.to_vec(),
-            input: Arc::new(inputs[0].clone()),
-            fetch: *fetch,
-        })),
-        LogicalPlan::Join(Join {
-            join_type,
-            join_constraint,
-            on,
-            null_equals_null,
-            ..
-        }) => {
-            let schema =
-                build_join_schema(inputs[0].schema(), inputs[1].schema(), 
join_type)?;
-
-            let equi_expr_count = on.len();
-            assert!(expr.len() >= equi_expr_count);
-
-            // The preceding part of expr is equi-exprs,
-            // and the struct of each equi-expr is like `left-expr = 
right-expr`.
-            let new_on:Vec<(Expr,Expr)> = 
expr.iter().take(equi_expr_count).map(|equi_expr| {
-                    // SimplifyExpression rule may add alias to the equi_expr.
-                    let unalias_expr = equi_expr.clone().unalias();
-                    if let Expr::BinaryExpr(BinaryExpr { left, 
op:Operator::Eq, right }) = unalias_expr {
-                        Ok((*left, *right))
-                    } else {
-                        internal_err!(
-                            "The front part expressions should be an binary 
equiality expression, actual:{equi_expr}"
-                        )
-                    }
-                }).collect::<Result<Vec<(Expr, Expr)>>>()?;
-
-            // Assume that the last expr, if any,
-            // is the filter_expr (non equality predicate from ON clause)
-            let filter_expr =
-                (expr.len() > equi_expr_count).then(|| expr[expr.len() - 
1].clone());
-
-            Ok(LogicalPlan::Join(Join {
-                left: Arc::new(inputs[0].clone()),
-                right: Arc::new(inputs[1].clone()),
-                join_type: *join_type,
-                join_constraint: *join_constraint,
-                on: new_on,
-                filter: filter_expr,
-                schema: DFSchemaRef::new(schema),
-                null_equals_null: *null_equals_null,
-            }))
-        }
-        LogicalPlan::CrossJoin(_) => {
-            let left = inputs[0].clone();
-            let right = inputs[1].clone();
-            LogicalPlanBuilder::from(left).cross_join(right)?.build()
-        }
-        LogicalPlan::Subquery(Subquery {
-            outer_ref_columns, ..
-        }) => {
-            let subquery = 
LogicalPlanBuilder::from(inputs[0].clone()).build()?;
-            Ok(LogicalPlan::Subquery(Subquery {
-                subquery: Arc::new(subquery),
-                outer_ref_columns: outer_ref_columns.clone(),
-            }))
-        }
-        LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
-            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
-                inputs[0].clone(),
-                alias.clone(),
-            )?))
-        }
-        LogicalPlan::Limit(Limit { skip, fetch, .. }) => 
Ok(LogicalPlan::Limit(Limit {
-            skip: *skip,
-            fetch: *fetch,
-            input: Arc::new(inputs[0].clone()),
-        })),
-        LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
-            name,
-            if_not_exists,
-            or_replace,
-            ..
-        })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
-            CreateMemoryTable {
-                input: Arc::new(inputs[0].clone()),
-                constraints: Constraints::empty(),
-                name: name.clone(),
-                if_not_exists: *if_not_exists,
-                or_replace: *or_replace,
-            },
-        ))),
-        LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
-            name,
-            or_replace,
-            definition,
-            ..
-        })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
-            input: Arc::new(inputs[0].clone()),
-            name: name.clone(),
-            or_replace: *or_replace,
-            definition: definition.clone(),
-        }))),
-        LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
-            node: e.node.from_template(expr, inputs),
-        })),
-        LogicalPlan::Union(Union { schema, .. }) => 
Ok(LogicalPlan::Union(Union {
-            inputs: inputs.iter().cloned().map(Arc::new).collect(),
-            schema: schema.clone(),
-        })),
-        LogicalPlan::Distinct(Distinct { .. }) => 
Ok(LogicalPlan::Distinct(Distinct {
-            input: Arc::new(inputs[0].clone()),
-        })),
-        LogicalPlan::Analyze(a) => {
-            assert!(expr.is_empty());
-            assert_eq!(inputs.len(), 1);
-            Ok(LogicalPlan::Analyze(Analyze {
-                verbose: a.verbose,
-                schema: a.schema.clone(),
-                input: Arc::new(inputs[0].clone()),
-            }))
-        }
-        LogicalPlan::Explain(_) => {
-            // Explain should be handled specially in the optimizers;
-            // If this check cannot pass it means some optimizer pass is
-            // trying to optimize Explain directly
-            if expr.is_empty() {
-                return plan_err!("Invalid EXPLAIN command. Expression is 
empty");
-            }
-
-            if inputs.is_empty() {
-                return plan_err!("Invalid EXPLAIN command. Inputs are empty");
-            }
-
-            Ok(plan.clone())
-        }
-        LogicalPlan::Prepare(Prepare {
-            name, data_types, ..
-        }) => Ok(LogicalPlan::Prepare(Prepare {
-            name: name.clone(),
-            data_types: data_types.clone(),
-            input: Arc::new(inputs[0].clone()),
-        })),
-        LogicalPlan::TableScan(ts) => {
-            assert!(inputs.is_empty(), "{plan:?}  should have no inputs");
-            Ok(LogicalPlan::TableScan(TableScan {
-                filters: expr.to_vec(),
-                ..ts.clone()
-            }))
-        }
-        LogicalPlan::EmptyRelation(_)
-        | LogicalPlan::Ddl(_)
-        | LogicalPlan::Statement(_) => {
-            // All of these plan types have no inputs / exprs so should not be 
called
-            assert!(expr.is_empty(), "{plan:?} should have no exprs");
-            assert!(inputs.is_empty(), "{plan:?}  should have no inputs");
-            Ok(plan.clone())
-        }
-        LogicalPlan::DescribeTable(_) => Ok(plan.clone()),
-        LogicalPlan::Unnest(Unnest {
-            column,
-            schema,
-            options,
-            ..
-        }) => {
-            // Update schema with unnested column type.
-            let input = Arc::new(inputs[0].clone());
-            let nested_field = input.schema().field_from_column(column)?;
-            let unnested_field = schema.field_from_column(column)?;
-            let fields = input
-                .schema()
-                .fields()
-                .iter()
-                .map(|f| {
-                    if f == nested_field {
-                        unnested_field.clone()
-                    } else {
-                        f.clone()
-                    }
-                })
-                .collect::<Vec<_>>();
-
-            let schema = Arc::new(
-                DFSchema::new_with_metadata(fields, 
input.schema().metadata().clone())?
-                    // We can use the existing functional dependencies as is:
-                    .with_functional_dependencies(
-                        input.schema().functional_dependencies().clone(),
-                    ),
-            );
-
-            Ok(LogicalPlan::Unnest(Unnest {
-                input,
-                column: column.clone(),
-                schema,
-                options: options.clone(),
-            }))
-        }
-    }
+    plan.with_new_exprs(expr.to_vec(), inputs)
 }
 
 /// Find all columns referenced from an aggregate query
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index ba4b5d7b17..c61a8d3350 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -42,7 +42,6 @@ use datafusion_expr::type_coercion::other::{
     get_coerce_type_for_case_expression, get_coerce_type_for_list,
 };
 use datafusion_expr::type_coercion::{is_datetime, is_numeric, 
is_utf8_or_large_utf8};
-use datafusion_expr::utils::from_plan;
 use datafusion_expr::{
     is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown,
     type_coercion, window_function, AggregateFunction, BuiltinScalarFunction, 
Expr,
@@ -112,13 +111,13 @@ fn analyze_internal(
         })
         .collect::<Result<Vec<_>>>()?;
 
-    // TODO: from_plan can't change the schema, so we need to do this here
+    // TODO: with_new_exprs can't change the schema, so we need to do this here
     match &plan {
         LogicalPlan::Projection(_) => 
Ok(LogicalPlan::Projection(Projection::try_new(
             new_expr,
             Arc::new(new_inputs[0].clone()),
         )?)),
-        _ => from_plan(plan, &new_expr, &new_inputs),
+        _ => plan.with_new_exprs(new_expr, &new_inputs),
     }
 }
 
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index d5c7641a13..571e2146c4 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -24,9 +24,7 @@ use datafusion_expr::{
     and,
     expr_rewriter::replace_col,
     logical_plan::{CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union},
-    or,
-    utils::from_plan,
-    BinaryExpr, Expr, Filter, Operator, TableProviderFilterPushDown,
+    or, BinaryExpr, Expr, Filter, Operator, TableProviderFilterPushDown,
 };
 use itertools::Itertools;
 use std::collections::{HashMap, HashSet};
@@ -457,7 +455,7 @@ fn push_down_all_join(
     if !join_conditions.is_empty() {
         new_exprs.push(join_conditions.into_iter().reduce(Expr::and).unwrap());
     }
-    let plan = from_plan(join_plan, &new_exprs, &[left, right])?;
+    let plan = join_plan.with_new_exprs(new_exprs, &[left, right])?;
 
     if keep_predicates.is_empty() {
         Ok(plan)
diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs 
b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index c65768bb8b..e6d66720ee 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -23,7 +23,7 @@ use super::{ExprSimplifier, SimplifyContext};
 use crate::utils::merge_schema;
 use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_common::{DFSchema, DFSchemaRef, Result};
-use datafusion_expr::{logical_plan::LogicalPlan, utils::from_plan};
+use datafusion_expr::logical_plan::LogicalPlan;
 use datafusion_physical_expr::execution_props::ExecutionProps;
 
 /// Optimizer Pass that simplifies [`LogicalPlan`]s by rewriting
@@ -93,7 +93,7 @@ impl SimplifyExpressions {
             })
             .collect::<Result<Vec<_>>>()?;
 
-        from_plan(plan, &expr, &new_inputs)
+        plan.with_new_exprs(expr, &new_inputs)
     }
 }
 
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs 
b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index 0be856ca9b..963a3dc06f 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -31,7 +31,6 @@ use datafusion_common::{
 };
 use datafusion_expr::expr::{BinaryExpr, Cast, InList, TryCast};
 use datafusion_expr::expr_rewriter::rewrite_preserving_name;
-use datafusion_expr::utils::from_plan;
 use datafusion_expr::{
     binary_expr, in_list, lit, Expr, ExprSchemable, LogicalPlan, Operator,
 };
@@ -105,11 +104,7 @@ impl OptimizerRule for UnwrapCastInComparison {
             .collect::<Result<Vec<_>>>()?;
 
         let inputs: Vec<LogicalPlan> = 
plan.inputs().into_iter().cloned().collect();
-        Ok(Some(from_plan(
-            plan,
-            new_exprs.as_slice(),
-            inputs.as_slice(),
-        )?))
+        Ok(Some(plan.with_new_exprs(new_exprs, inputs.as_slice())?))
     }
 
     fn name(&self) -> &str {


Reply via email to