This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ee9736fc81 Remove some recursive clones (#9050)
ee9736fc81 is described below

commit ee9736fc81b4461d417b409baeaeb9f5e01ad962
Author: Mehmet Ozan Kabak <[email protected]>
AuthorDate: Tue Jan 30 14:49:17 2024 +0300

    Remove some recursive clones (#9050)
---
 datafusion/expr/src/logical_plan/builder.rs        |   2 +-
 datafusion/expr/src/logical_plan/plan.rs           |  74 ++++++-------
 datafusion/expr/src/tree_node/plan.rs              |   6 +-
 datafusion/expr/src/utils.rs                       |   2 +-
 datafusion/optimizer/src/analyzer/rewrite_expr.rs  |   2 +-
 datafusion/optimizer/src/analyzer/type_coercion.rs |   6 +-
 datafusion/optimizer/src/eliminate_outer_join.rs   |   5 +-
 datafusion/optimizer/src/optimize_projections.rs   |   6 +-
 datafusion/optimizer/src/optimizer.rs              |   5 +-
 datafusion/optimizer/src/push_down_filter.rs       | 117 ++++++++-------------
 datafusion/optimizer/src/push_down_limit.rs        | 106 +++++++++----------
 .../src/simplify_expressions/simplify_exprs.rs     |   2 +-
 .../optimizer/src/unwrap_cast_in_comparison.rs     |   4 +-
 datafusion/optimizer/src/utils.rs                  |   3 +-
 14 files changed, 154 insertions(+), 186 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index eb5e5bd426..6ae5d37f9e 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -469,7 +469,7 @@ impl LogicalPlanBuilder {
                         )
                     })
                     .collect::<Result<Vec<_>>>()?;
-                curr_plan.with_new_exprs(curr_plan.expressions(), &new_inputs)
+                curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
             }
         }
     }
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index aee3a59dd2..b72dd7f5ec 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -565,7 +565,7 @@ impl LogicalPlan {
     /// Returns a copy of this `LogicalPlan` with the new inputs
     #[deprecated(since = "35.0.0", note = "please use `with_new_exprs` 
instead")]
     pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> 
Result<LogicalPlan> {
-        self.with_new_exprs(self.expressions(), inputs)
+        self.with_new_exprs(self.expressions(), inputs.to_vec())
     }
 
     /// Returns a new `LogicalPlan` based on `self` with inputs and
@@ -590,13 +590,13 @@ impl LogicalPlan {
     pub fn with_new_exprs(
         &self,
         mut expr: Vec<Expr>,
-        inputs: &[LogicalPlan],
+        mut inputs: Vec<LogicalPlan>,
     ) -> Result<LogicalPlan> {
         match self {
             // Since expr may be different than the previous expr, schema of 
the projection
             // may change. We need to use try_new method instead of 
try_new_with_schema method.
             LogicalPlan::Projection(Projection { .. }) => {
-                Projection::try_new(expr, Arc::new(inputs[0].clone()))
+                Projection::try_new(expr, Arc::new(inputs.swap_remove(0)))
                     .map(LogicalPlan::Projection)
             }
             LogicalPlan::Dml(DmlStatement {
@@ -608,7 +608,7 @@ impl LogicalPlan {
                 table_name: table_name.clone(),
                 table_schema: table_schema.clone(),
                 op: op.clone(),
-                input: Arc::new(inputs[0].clone()),
+                input: Arc::new(inputs.swap_remove(0)),
             })),
             LogicalPlan::Copy(CopyTo {
                 input: _,
@@ -617,7 +617,7 @@ impl LogicalPlan {
                 copy_options,
                 single_file_output,
             }) => Ok(LogicalPlan::Copy(CopyTo {
-                input: Arc::new(inputs[0].clone()),
+                input: Arc::new(inputs.swap_remove(0)),
                 output_url: output_url.clone(),
                 file_format: file_format.clone(),
                 single_file_output: *single_file_output,
@@ -629,7 +629,7 @@ impl LogicalPlan {
                     values: expr
                         .chunks_exact(schema.fields().len())
                         .map(|s| s.to_vec())
-                        .collect::<Vec<_>>(),
+                        .collect(),
                 }))
             }
             LogicalPlan::Filter { .. } => {
@@ -674,7 +674,7 @@ impl LogicalPlan {
                 let mut remove_aliases = RemoveAliases {};
                 let predicate = predicate.rewrite(&mut remove_aliases)?;
 
-                Filter::try_new(predicate, Arc::new(inputs[0].clone()))
+                Filter::try_new(predicate, Arc::new(inputs.swap_remove(0)))
                     .map(LogicalPlan::Filter)
             }
             LogicalPlan::Repartition(Repartition {
@@ -684,35 +684,35 @@ impl LogicalPlan {
                 Partitioning::RoundRobinBatch(n) => {
                     Ok(LogicalPlan::Repartition(Repartition {
                         partitioning_scheme: Partitioning::RoundRobinBatch(*n),
-                        input: Arc::new(inputs[0].clone()),
+                        input: Arc::new(inputs.swap_remove(0)),
                     }))
                 }
                 Partitioning::Hash(_, n) => 
Ok(LogicalPlan::Repartition(Repartition {
                     partitioning_scheme: Partitioning::Hash(expr, *n),
-                    input: Arc::new(inputs[0].clone()),
+                    input: Arc::new(inputs.swap_remove(0)),
                 })),
                 Partitioning::DistributeBy(_) => {
                     Ok(LogicalPlan::Repartition(Repartition {
                         partitioning_scheme: Partitioning::DistributeBy(expr),
-                        input: Arc::new(inputs[0].clone()),
+                        input: Arc::new(inputs.swap_remove(0)),
                     }))
                 }
             },
             LogicalPlan::Window(Window { window_expr, .. }) => {
                 assert_eq!(window_expr.len(), expr.len());
-                Window::try_new(expr, Arc::new(inputs[0].clone()))
+                Window::try_new(expr, Arc::new(inputs.swap_remove(0)))
                     .map(LogicalPlan::Window)
             }
             LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
                 // group exprs are the first expressions
                 let agg_expr = expr.split_off(group_expr.len());
 
-                Aggregate::try_new(Arc::new(inputs[0].clone()), expr, agg_expr)
+                Aggregate::try_new(Arc::new(inputs.swap_remove(0)), expr, 
agg_expr)
                     .map(LogicalPlan::Aggregate)
             }
             LogicalPlan::Sort(Sort { fetch, .. }) => Ok(LogicalPlan::Sort(Sort 
{
                 expr,
-                input: Arc::new(inputs[0].clone()),
+                input: Arc::new(inputs.swap_remove(0)),
                 fetch: *fetch,
             })),
             LogicalPlan::Join(Join {
@@ -739,7 +739,7 @@ impl LogicalPlan {
                 // The first part of expr is equi-exprs,
                 // and the struct of each equi-expr is like `left-expr = 
right-expr`.
                 assert_eq!(expr.len(), equi_expr_count);
-                let new_on:Vec<(Expr,Expr)> = expr.into_iter().map(|equi_expr| 
{
+                let new_on = expr.into_iter().map(|equi_expr| {
                     // SimplifyExpression rule may add alias to the equi_expr.
                     let unalias_expr = equi_expr.clone().unalias();
                     if let Expr::BinaryExpr(BinaryExpr { left, op: 
Operator::Eq, right }) = unalias_expr {
@@ -752,8 +752,8 @@ impl LogicalPlan {
                 }).collect::<Result<Vec<(Expr, Expr)>>>()?;
 
                 Ok(LogicalPlan::Join(Join {
-                    left: Arc::new(inputs[0].clone()),
-                    right: Arc::new(inputs[1].clone()),
+                    left: Arc::new(inputs.swap_remove(0)),
+                    right: Arc::new(inputs.swap_remove(0)),
                     join_type: *join_type,
                     join_constraint: *join_constraint,
                     on: new_on,
@@ -763,28 +763,28 @@ impl LogicalPlan {
                 }))
             }
             LogicalPlan::CrossJoin(_) => {
-                let left = inputs[0].clone();
-                let right = inputs[1].clone();
+                let left = inputs.swap_remove(0);
+                let right = inputs.swap_remove(0);
                 LogicalPlanBuilder::from(left).cross_join(right)?.build()
             }
             LogicalPlan::Subquery(Subquery {
                 outer_ref_columns, ..
             }) => {
-                let subquery = 
LogicalPlanBuilder::from(inputs[0].clone()).build()?;
+                let subquery = 
LogicalPlanBuilder::from(inputs.swap_remove(0)).build()?;
                 Ok(LogicalPlan::Subquery(Subquery {
                     subquery: Arc::new(subquery),
                     outer_ref_columns: outer_ref_columns.clone(),
                 }))
             }
             LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
-                SubqueryAlias::try_new(Arc::new(inputs[0].clone()), 
alias.clone())
+                SubqueryAlias::try_new(Arc::new(inputs.swap_remove(0)), 
alias.clone())
                     .map(LogicalPlan::SubqueryAlias)
             }
             LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
                 Ok(LogicalPlan::Limit(Limit {
                     skip: *skip,
                     fetch: *fetch,
-                    input: Arc::new(inputs[0].clone()),
+                    input: Arc::new(inputs.swap_remove(0)),
                 }))
             }
             LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable 
{
@@ -795,7 +795,7 @@ impl LogicalPlan {
                 ..
             })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
                 CreateMemoryTable {
-                    input: Arc::new(inputs[0].clone()),
+                    input: Arc::new(inputs.swap_remove(0)),
                     constraints: Constraints::empty(),
                     name: name.clone(),
                     if_not_exists: *if_not_exists,
@@ -809,30 +809,30 @@ impl LogicalPlan {
                 definition,
                 ..
             })) => Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
-                input: Arc::new(inputs[0].clone()),
+                input: Arc::new(inputs.swap_remove(0)),
                 name: name.clone(),
                 or_replace: *or_replace,
                 definition: definition.clone(),
             }))),
             LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
-                node: e.node.from_template(&expr, inputs),
+                node: e.node.from_template(&expr, &inputs),
             })),
             LogicalPlan::Union(Union { schema, .. }) => {
                 let input_schema = inputs[0].schema();
                 // If inputs are not pruned do not change schema.
                 let schema = if schema.fields().len() == 
input_schema.fields().len() {
-                    schema
+                    schema.clone()
                 } else {
-                    input_schema
+                    input_schema.clone()
                 };
                 Ok(LogicalPlan::Union(Union {
-                    inputs: inputs.iter().cloned().map(Arc::new).collect(),
-                    schema: schema.clone(),
+                    inputs: inputs.into_iter().map(Arc::new).collect(),
+                    schema,
                 }))
             }
             LogicalPlan::Distinct(distinct) => {
                 let distinct = match distinct {
-                    Distinct::All(_) => 
Distinct::All(Arc::new(inputs[0].clone())),
+                    Distinct::All(_) => 
Distinct::All(Arc::new(inputs.swap_remove(0))),
                     Distinct::On(DistinctOn {
                         on_expr,
                         select_expr,
@@ -848,7 +848,7 @@ impl LogicalPlan {
                             } else {
                                 None
                             },
-                            Arc::new(inputs[0].clone()),
+                            Arc::new(inputs.swap_remove(0)),
                         )?)
                     }
                 };
@@ -858,8 +858,8 @@ impl LogicalPlan {
                 name, is_distinct, ..
             }) => Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
                 name: name.clone(),
-                static_term: Arc::new(inputs[0].clone()),
-                recursive_term: Arc::new(inputs[1].clone()),
+                static_term: Arc::new(inputs.swap_remove(0)),
+                recursive_term: Arc::new(inputs.swap_remove(0)),
                 is_distinct: *is_distinct,
             })),
             LogicalPlan::Analyze(a) => {
@@ -868,7 +868,7 @@ impl LogicalPlan {
                 Ok(LogicalPlan::Analyze(Analyze {
                     verbose: a.verbose,
                     schema: a.schema.clone(),
-                    input: Arc::new(inputs[0].clone()),
+                    input: Arc::new(inputs.swap_remove(0)),
                 }))
             }
             LogicalPlan::Explain(e) => {
@@ -879,7 +879,7 @@ impl LogicalPlan {
                 assert_eq!(inputs.len(), 1, "Invalid EXPLAIN command. Inputs 
are empty");
                 Ok(LogicalPlan::Explain(Explain {
                     verbose: e.verbose,
-                    plan: Arc::new(inputs[0].clone()),
+                    plan: Arc::new(inputs.swap_remove(0)),
                     stringified_plans: e.stringified_plans.clone(),
                     schema: e.schema.clone(),
                     logical_optimization_succeeded: 
e.logical_optimization_succeeded,
@@ -890,7 +890,7 @@ impl LogicalPlan {
             }) => Ok(LogicalPlan::Prepare(Prepare {
                 name: name.clone(),
                 data_types: data_types.clone(),
-                input: Arc::new(inputs[0].clone()),
+                input: Arc::new(inputs.swap_remove(0)),
             })),
             LogicalPlan::TableScan(ts) => {
                 assert!(inputs.is_empty(), "{self:?}  should have no inputs");
@@ -915,7 +915,7 @@ impl LogicalPlan {
                 ..
             }) => {
                 // Update schema with unnested column type.
-                let input = Arc::new(inputs[0].clone());
+                let input = Arc::new(inputs.swap_remove(0));
                 let nested_field = input.schema().field_from_column(column)?;
                 let unnested_field = schema.field_from_column(column)?;
                 let fields = input
@@ -1199,7 +1199,7 @@ impl LogicalPlan {
             .map(|inp| inp.replace_params_with_values(param_values))
             .collect::<Result<Vec<_>>>()?;
 
-        self.with_new_exprs(new_exprs, &new_inputs_with_values)
+        self.with_new_exprs(new_exprs, new_inputs_with_values)
     }
 
     /// Walk the logical plan, find any `Placeholder` tokens, and return a map 
of their IDs and DataTypes
diff --git a/datafusion/expr/src/tree_node/plan.rs 
b/datafusion/expr/src/tree_node/plan.rs
index 589bb917a9..c35a09874a 100644
--- a/datafusion/expr/src/tree_node/plan.rs
+++ b/datafusion/expr/src/tree_node/plan.rs
@@ -89,11 +89,11 @@ impl TreeNode for LogicalPlan {
 
         // if any changes made, make a new child
         if old_children
-            .iter()
+            .into_iter()
             .zip(new_children.iter())
-            .any(|(c1, c2)| c1 != &c2)
+            .any(|(c1, c2)| c1 != c2)
         {
-            self.with_new_exprs(self.expressions(), new_children.as_slice())
+            self.with_new_exprs(self.expressions(), new_children)
         } else {
             Ok(self)
         }
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 02479c0765..5d011e097f 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -722,7 +722,7 @@ pub fn from_plan(
     expr: &[Expr],
     inputs: &[LogicalPlan],
 ) -> Result<LogicalPlan> {
-    plan.with_new_exprs(expr.to_vec(), inputs)
+    plan.with_new_exprs(expr.to_vec(), inputs.to_vec())
 }
 
 /// Find all columns referenced from an aggregate query
diff --git a/datafusion/optimizer/src/analyzer/rewrite_expr.rs 
b/datafusion/optimizer/src/analyzer/rewrite_expr.rs
index 8f1c844ed0..eedfc40a7f 100644
--- a/datafusion/optimizer/src/analyzer/rewrite_expr.rs
+++ b/datafusion/optimizer/src/analyzer/rewrite_expr.rs
@@ -86,7 +86,7 @@ fn analyze_internal(plan: &LogicalPlan) -> 
Result<LogicalPlan> {
         })
         .collect::<Result<Vec<_>>>()?;
 
-    plan.with_new_exprs(new_expr, &new_inputs)
+    plan.with_new_exprs(new_expr, new_inputs)
 }
 
 pub(crate) struct OperatorToFunctionRewriter {
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index c9ecb2a770..8710249e12 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -77,7 +77,7 @@ fn analyze_internal(
     plan: &LogicalPlan,
 ) -> Result<LogicalPlan> {
     // optimize child plans first
-    let new_inputs = plan
+    let mut new_inputs = plan
         .inputs()
         .iter()
         .map(|p| analyze_internal(external_schema, p))
@@ -115,9 +115,9 @@ fn analyze_internal(
     match &plan {
         LogicalPlan::Projection(_) => 
Ok(LogicalPlan::Projection(Projection::try_new(
             new_expr,
-            Arc::new(new_inputs[0].clone()),
+            Arc::new(new_inputs.swap_remove(0)),
         )?)),
-        _ => plan.with_new_exprs(new_expr, &new_inputs),
+        _ => plan.with_new_exprs(new_expr, new_inputs),
     }
 }
 
diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs 
b/datafusion/optimizer/src/eliminate_outer_join.rs
index 53c4b3702b..56a4a76987 100644
--- a/datafusion/optimizer/src/eliminate_outer_join.rs
+++ b/datafusion/optimizer/src/eliminate_outer_join.rs
@@ -106,9 +106,8 @@ impl OptimizerRule for EliminateOuterJoin {
                         schema: join.schema.clone(),
                         null_equals_null: join.null_equals_null,
                     });
-                    let new_plan =
-                        plan.with_new_exprs(plan.expressions(), &[new_join])?;
-                    Ok(Some(new_plan))
+                    let exprs = plan.expressions();
+                    plan.with_new_exprs(exprs, vec![new_join]).map(Some)
                 }
                 _ => Ok(None),
             },
diff --git a/datafusion/optimizer/src/optimize_projections.rs 
b/datafusion/optimizer/src/optimize_projections.rs
index 1035995642..db0459fd9a 100644
--- a/datafusion/optimizer/src/optimize_projections.rs
+++ b/datafusion/optimizer/src/optimize_projections.rs
@@ -375,9 +375,9 @@ fn optimize_projections(
             // If new_input is `None`, this means child is not changed, so use
             // `old_child` during construction:
             .map(|(new_input, old_child)| new_input.unwrap_or_else(|| 
old_child.clone()))
-            .collect::<Vec<_>>();
-        plan.with_new_exprs(plan.expressions(), &new_inputs)
-            .map(Some)
+            .collect();
+        let exprs = plan.expressions();
+        plan.with_new_exprs(exprs, new_inputs).map(Some)
     }
 }
 
diff --git a/datafusion/optimizer/src/optimizer.rs 
b/datafusion/optimizer/src/optimizer.rs
index a192348f69..633a32996d 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -380,9 +380,10 @@ impl Optimizer {
                 Some(plan) => plan,
                 None => old_plan.clone(),
             })
-            .collect::<Vec<_>>();
+            .collect();
 
-        Ok(Some(plan.with_new_exprs(plan.expressions(), &new_inputs)?))
+        let exprs = plan.expressions();
+        plan.with_new_exprs(exprs, new_inputs).map(Some)
     }
 
     /// Use a rule to optimize the whole plan.
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index 7086c5cda5..fc56cbb868 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -297,15 +297,10 @@ fn can_evaluate_as_join_condition(predicate: &Expr) -> 
Result<bool> {
 //
 // do nothing.
 //
-fn extract_or_clauses_for_join(
-    filters: &[&Expr],
-    schema: &DFSchema,
-    preserved: bool,
-) -> Vec<Expr> {
-    if !preserved {
-        return vec![];
-    }
-
+fn extract_or_clauses_for_join<'a>(
+    filters: &'a [Expr],
+    schema: &'a DFSchema,
+) -> impl Iterator<Item = Expr> + 'a {
     let schema_columns = schema
         .fields()
         .iter()
@@ -318,8 +313,8 @@ fn extract_or_clauses_for_join(
         })
         .collect::<HashSet<_>>();
 
-    let mut exprs = vec![];
-    for expr in filters.iter() {
+    // new formed OR clauses and their column references
+    filters.iter().filter_map(move |expr| {
         if let Expr::BinaryExpr(BinaryExpr {
             left,
             op: Operator::Or,
@@ -331,13 +326,11 @@ fn extract_or_clauses_for_join(
 
             // If nothing can be extracted from any sub clauses, do nothing 
for this OR clause.
             if let (Some(left_expr), Some(right_expr)) = (left_expr, 
right_expr) {
-                exprs.push(or(left_expr, right_expr));
+                return Some(or(left_expr, right_expr));
             }
         }
-    }
-
-    // new formed OR clauses and their column references
-    exprs
+        None
+    })
 }
 
 // extract qual from OR sub-clause.
@@ -425,15 +418,17 @@ fn push_down_all_join(
     // 1) can push through join to its children(left or right)
     // 2) can be converted to join conditions if the join type is Inner
     // 3) should be kept as filter conditions
+    let left_schema = left.schema();
+    let right_schema = right.schema();
     let mut left_push = vec![];
     let mut right_push = vec![];
     let mut keep_predicates = vec![];
     let mut join_conditions = vec![];
     for predicate in predicates {
-        if left_preserved && can_pushdown_join_predicate(&predicate, 
left.schema())? {
+        if left_preserved && can_pushdown_join_predicate(&predicate, 
left_schema)? {
             left_push.push(predicate);
         } else if right_preserved
-            && can_pushdown_join_predicate(&predicate, right.schema())?
+            && can_pushdown_join_predicate(&predicate, right_schema)?
         {
             right_push.push(predicate);
         } else if is_inner_join && can_evaluate_as_join_condition(&predicate)? 
{
@@ -447,10 +442,10 @@ fn push_down_all_join(
 
     // For infer predicates, if they can not push through join, just drop them
     for predicate in infer_predicates {
-        if left_preserved && can_pushdown_join_predicate(&predicate, 
left.schema())? {
+        if left_preserved && can_pushdown_join_predicate(&predicate, 
left_schema)? {
             left_push.push(predicate);
         } else if right_preserved
-            && can_pushdown_join_predicate(&predicate, right.schema())?
+            && can_pushdown_join_predicate(&predicate, right_schema)?
         {
             right_push.push(predicate);
         }
@@ -459,10 +454,10 @@ fn push_down_all_join(
     if !on_filter.is_empty() {
         let (on_left_preserved, on_right_preserved) = 
on_lr_is_preserved(join_plan)?;
         for on in on_filter {
-            if on_left_preserved && can_pushdown_join_predicate(&on, 
left.schema())? {
+            if on_left_preserved && can_pushdown_join_predicate(&on, 
left_schema)? {
                 left_push.push(on)
             } else if on_right_preserved
-                && can_pushdown_join_predicate(&on, right.schema())?
+                && can_pushdown_join_predicate(&on, right_schema)?
             {
                 right_push.push(on)
             } else {
@@ -473,31 +468,14 @@ fn push_down_all_join(
 
     // Extract from OR clause, generate new predicates for both side of join 
if possible.
     // We only track the unpushable predicates above.
-    let or_to_left = extract_or_clauses_for_join(
-        &keep_predicates.iter().collect::<Vec<_>>(),
-        left.schema(),
-        left_preserved,
-    );
-    let or_to_right = extract_or_clauses_for_join(
-        &keep_predicates.iter().collect::<Vec<_>>(),
-        right.schema(),
-        right_preserved,
-    );
-    let on_or_to_left = extract_or_clauses_for_join(
-        &join_conditions.iter().collect::<Vec<_>>(),
-        left.schema(),
-        left_preserved,
-    );
-    let on_or_to_right = extract_or_clauses_for_join(
-        &join_conditions.iter().collect::<Vec<_>>(),
-        right.schema(),
-        right_preserved,
-    );
-
-    left_push.extend(or_to_left);
-    left_push.extend(on_or_to_left);
-    right_push.extend(or_to_right);
-    right_push.extend(on_or_to_right);
+    if left_preserved {
+        left_push.extend(extract_or_clauses_for_join(&keep_predicates, 
left_schema));
+        left_push.extend(extract_or_clauses_for_join(&join_conditions, 
left_schema));
+    }
+    if right_preserved {
+        right_push.extend(extract_or_clauses_for_join(&keep_predicates, 
right_schema));
+        right_push.extend(extract_or_clauses_for_join(&join_conditions, 
right_schema));
+    }
 
     let left = match conjunction(left_push) {
         Some(predicate) => {
@@ -519,28 +497,19 @@ fn push_down_all_join(
     //      it always will be the last element, otherwise result
     //      vector will contain only join keys (without additional
     //      element representing filter).
-    let expr = join_plan.expressions();
-    let mut new_exprs = if !on_filter_empty {
-        expr[..expr.len() - 1].to_vec()
-    } else {
-        expr
-    };
-    if !join_conditions.is_empty() {
-        new_exprs.push(join_conditions.into_iter().reduce(Expr::and).unwrap());
+    let mut exprs = join_plan.expressions();
+    if !on_filter_empty {
+        exprs.pop();
     }
-    let plan = join_plan.with_new_exprs(new_exprs, &[left, right])?;
+    exprs.extend(join_conditions.into_iter().reduce(Expr::and));
+    let plan = join_plan.with_new_exprs(exprs, vec![left, right])?;
 
-    if keep_predicates.is_empty() {
-        Ok(plan)
-    } else {
-        // wrap the join on the filter whose predicates must be kept
-        match conjunction(keep_predicates) {
-            Some(predicate) => Ok(LogicalPlan::Filter(Filter::try_new(
-                predicate,
-                Arc::new(plan),
-            )?)),
-            None => Ok(plan),
+    // wrap the join on the filter whose predicates must be kept
+    match conjunction(keep_predicates) {
+        Some(predicate) => {
+            Filter::try_new(predicate, Arc::new(plan)).map(LogicalPlan::Filter)
         }
+        None => Ok(plan),
     }
 }
 
@@ -694,9 +663,9 @@ impl OptimizerRule for PushDownFilter {
                 // commutable
                 let new_filter = plan.with_new_exprs(
                     plan.expressions(),
-                    &[child_plan.inputs()[0].clone()],
+                    vec![child_plan.inputs()[0].clone()],
                 )?;
-                child_plan.with_new_exprs(child_plan.expressions(), 
&[new_filter])?
+                child_plan.with_new_exprs(child_plan.expressions(), 
vec![new_filter])?
             }
             LogicalPlan::SubqueryAlias(subquery_alias) => {
                 let mut replace_map = HashMap::new();
@@ -719,7 +688,7 @@ impl OptimizerRule for PushDownFilter {
                     new_predicate,
                     subquery_alias.input.clone(),
                 )?);
-                child_plan.with_new_exprs(child_plan.expressions(), 
&[new_filter])?
+                child_plan.with_new_exprs(child_plan.expressions(), 
vec![new_filter])?
             }
             LogicalPlan::Projection(projection) => {
                 // A projection is filter-commutable if it do not contain 
volatile predicates or contain volatile
@@ -767,12 +736,12 @@ impl OptimizerRule for PushDownFilter {
                         match conjunction(keep_predicates) {
                             None => child_plan.with_new_exprs(
                                 child_plan.expressions(),
-                                &[new_filter],
+                                vec![new_filter],
                             )?,
                             Some(keep_predicate) => {
                                 let child_plan = child_plan.with_new_exprs(
                                     child_plan.expressions(),
-                                    &[new_filter],
+                                    vec![new_filter],
                                 )?;
                                 LogicalPlan::Filter(Filter::try_new(
                                     keep_predicate,
@@ -843,13 +812,13 @@ impl OptimizerRule for PushDownFilter {
                 let child = match conjunction(replaced_push_predicates) {
                     Some(predicate) => LogicalPlan::Filter(Filter::try_new(
                         predicate,
-                        Arc::new((*agg.input).clone()),
+                        agg.input.clone(),
                     )?),
                     None => (*agg.input).clone(),
                 };
                 let new_agg = filter
                     .input
-                    .with_new_exprs(filter.input.expressions(), &vec![child])?;
+                    .with_new_exprs(filter.input.expressions(), vec![child])?;
                 match conjunction(keep_predicates) {
                     Some(predicate) => LogicalPlan::Filter(Filter::try_new(
                         predicate,
@@ -955,7 +924,7 @@ impl OptimizerRule for PushDownFilter {
                 };
                 // extension with new inputs.
                 let new_extension =
-                    child_plan.with_new_exprs(child_plan.expressions(), 
&new_children)?;
+                    child_plan.with_new_exprs(child_plan.expressions(), 
new_children)?;
 
                 match conjunction(keep_predicates) {
                     Some(predicate) => LogicalPlan::Filter(Filter::try_new(
diff --git a/datafusion/optimizer/src/push_down_limit.rs 
b/datafusion/optimizer/src/push_down_limit.rs
index c2f35a7906..33d02d5c56 100644
--- a/datafusion/optimizer/src/push_down_limit.rs
+++ b/datafusion/optimizer/src/push_down_limit.rs
@@ -17,14 +17,17 @@
 
 //! Optimizer rule to push down LIMIT in the query plan
 //! It will push down through projection, limits (taking the smaller limit)
+
+use std::sync::Arc;
+
 use crate::optimizer::ApplyOrder;
 use crate::{OptimizerConfig, OptimizerRule};
+
 use datafusion_common::Result;
-use datafusion_expr::{
-    logical_plan::{Join, JoinType, Limit, LogicalPlan, Sort, TableScan, Union},
-    CrossJoin,
+use datafusion_expr::logical_plan::{
+    Join, JoinType, Limit, LogicalPlan, Sort, TableScan, Union,
 };
-use std::sync::Arc;
+use datafusion_expr::CrossJoin;
 
 /// Optimization rule that tries to push down LIMIT.
 #[derive(Default)]
@@ -46,9 +49,8 @@ impl OptimizerRule for PushDownLimit {
     ) -> Result<Option<LogicalPlan>> {
         use std::cmp::min;
 
-        let limit = match plan {
-            LogicalPlan::Limit(limit) => limit,
-            _ => return Ok(None),
+        let LogicalPlan::Limit(limit) = plan else {
+            return Ok(None);
         };
 
         if let LogicalPlan::Limit(child) = &*limit.input {
@@ -96,27 +98,22 @@ impl OptimizerRule for PushDownLimit {
                 fetch: new_fetch,
                 input: Arc::new((*child.input).clone()),
             });
-            return {
-                match self.try_optimize(&plan, _config)? {
-                    Some(new_plan) => Ok(Some(new_plan)),
-                    None => Ok(Some(plan)),
-                }
-            };
+            return self
+                .try_optimize(&plan, _config)
+                .map(|opt_plan| opt_plan.or_else(|| Some(plan)));
         }
 
-        let fetch = match limit.fetch {
-            Some(fetch) => fetch,
-            None => return Ok(None),
+        let Some(fetch) = limit.fetch else {
+            return Ok(None);
         };
         let skip = limit.skip;
-        let child_plan = &*limit.input;
 
-        let plan = match child_plan {
+        match limit.input.as_ref() {
             LogicalPlan::TableScan(scan) => {
                 let limit = if fetch != 0 { fetch + skip } else { 0 };
                 let new_fetch = scan.fetch.map(|x| min(x, 
limit)).or(Some(limit));
                 if new_fetch == scan.fetch {
-                    None
+                    Ok(None)
                 } else {
                     let new_input = LogicalPlan::TableScan(TableScan {
                         table_name: scan.table_name.clone(),
@@ -126,7 +123,8 @@ impl OptimizerRule for PushDownLimit {
                         fetch: scan.fetch.map(|x| min(x, 
limit)).or(Some(limit)),
                         projected_schema: scan.projected_schema.clone(),
                     });
-                    Some(plan.with_new_exprs(plan.expressions(), 
&[new_input])?)
+                    plan.with_new_exprs(plan.expressions(), vec![new_input])
+                        .map(Some)
                 }
             }
             LogicalPlan::Union(union) => {
@@ -137,7 +135,7 @@ impl OptimizerRule for PushDownLimit {
                         Ok(Arc::new(LogicalPlan::Limit(Limit {
                             skip: 0,
                             fetch: Some(fetch + skip),
-                            input: Arc::new((**x).clone()),
+                            input: x.clone(),
                         })))
                     })
                     .collect::<Result<_>>()?;
@@ -145,38 +143,36 @@ impl OptimizerRule for PushDownLimit {
                     inputs: new_inputs,
                     schema: union.schema.clone(),
                 });
-                Some(plan.with_new_exprs(plan.expressions(), &[union])?)
+                plan.with_new_exprs(plan.expressions(), vec![union])
+                    .map(Some)
             }
 
             LogicalPlan::CrossJoin(cross_join) => {
-                let left = &*cross_join.left;
-                let right = &*cross_join.right;
                 let new_left = LogicalPlan::Limit(Limit {
                     skip: 0,
                     fetch: Some(fetch + skip),
-                    input: Arc::new(left.clone()),
+                    input: cross_join.left.clone(),
                 });
                 let new_right = LogicalPlan::Limit(Limit {
                     skip: 0,
                     fetch: Some(fetch + skip),
-                    input: Arc::new(right.clone()),
+                    input: cross_join.right.clone(),
                 });
                 let new_cross_join = LogicalPlan::CrossJoin(CrossJoin {
                     left: Arc::new(new_left),
                     right: Arc::new(new_right),
                     schema: plan.schema().clone(),
                 });
-                Some(plan.with_new_exprs(plan.expressions(), 
&[new_cross_join])?)
+                plan.with_new_exprs(plan.expressions(), vec![new_cross_join])
+                    .map(Some)
             }
 
             LogicalPlan::Join(join) => {
-                let new_join = push_down_join(join, fetch + skip);
-                match new_join {
-                    Some(new_join) => Some(plan.with_new_exprs(
-                        plan.expressions(),
-                        &[LogicalPlan::Join(new_join)],
-                    )?),
-                    None => None,
+                if let Some(new_join) = push_down_join(join, fetch + skip) {
+                    let inputs = vec![LogicalPlan::Join(new_join)];
+                    plan.with_new_exprs(plan.expressions(), inputs).map(Some)
+                } else {
+                    Ok(None)
                 }
             }
 
@@ -186,28 +182,29 @@ impl OptimizerRule for PushDownLimit {
                     Some(sort.fetch.map(|f| 
f.min(sort_fetch)).unwrap_or(sort_fetch))
                 };
                 if new_fetch == sort.fetch {
-                    None
+                    Ok(None)
                 } else {
                     let new_sort = LogicalPlan::Sort(Sort {
                         expr: sort.expr.clone(),
-                        input: Arc::new((*sort.input).clone()),
+                        input: sort.input.clone(),
                         fetch: new_fetch,
                     });
-                    Some(plan.with_new_exprs(plan.expressions(), &[new_sort])?)
+                    plan.with_new_exprs(plan.expressions(), vec![new_sort])
+                        .map(Some)
                 }
             }
-            LogicalPlan::Projection(_) | LogicalPlan::SubqueryAlias(_) => {
+            child_plan @ (LogicalPlan::Projection(_) | 
LogicalPlan::SubqueryAlias(_)) => {
                 // commute
                 let new_limit = plan.with_new_exprs(
                     plan.expressions(),
-                    &[child_plan.inputs()[0].clone()],
+                    vec![child_plan.inputs()[0].clone()],
                 )?;
-                Some(child_plan.with_new_exprs(child_plan.expressions(), 
&[new_limit])?)
+                child_plan
+                    .with_new_exprs(child_plan.expressions(), vec![new_limit])
+                    .map(Some)
             }
-            _ => None,
-        };
-
-        Ok(plan)
+            _ => Ok(None),
+        }
     }
 
     fn name(&self) -> &str {
@@ -245,24 +242,24 @@ fn push_down_join(join: &Join, limit: usize) -> 
Option<Join> {
         (None, None) => None,
         _ => {
             let left = match left_limit {
-                Some(limit) => LogicalPlan::Limit(Limit {
+                Some(limit) => Arc::new(LogicalPlan::Limit(Limit {
                     skip: 0,
                     fetch: Some(limit),
-                    input: Arc::new((*join.left).clone()),
-                }),
-                None => (*join.left).clone(),
+                    input: join.left.clone(),
+                })),
+                None => join.left.clone(),
             };
             let right = match right_limit {
-                Some(limit) => LogicalPlan::Limit(Limit {
+                Some(limit) => Arc::new(LogicalPlan::Limit(Limit {
                     skip: 0,
                     fetch: Some(limit),
-                    input: Arc::new((*join.right).clone()),
-                }),
-                None => (*join.right).clone(),
+                    input: join.right.clone(),
+                })),
+                None => join.right.clone(),
             };
             Some(Join {
-                left: Arc::new(left),
-                right: Arc::new(right),
+                left,
+                right,
                 on: join.on.clone(),
                 filter: join.filter.clone(),
                 join_type: join.join_type,
@@ -280,6 +277,7 @@ mod test {
 
     use super::*;
     use crate::test::*;
+
     use datafusion_expr::{
         col, exists,
         logical_plan::{builder::LogicalPlanBuilder, JoinType, LogicalPlan},
diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs 
b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index 7265b17dd0..d68474dcde 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -122,7 +122,7 @@ impl SimplifyExpressions {
             }
         };
 
-        plan.with_new_exprs(expr, &new_inputs)
+        plan.with_new_exprs(expr, new_inputs)
     }
 }
 
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs 
b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index 91603e82a5..9d3561d126 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -109,8 +109,8 @@ impl OptimizerRule for UnwrapCastInComparison {
             .map(|expr| rewrite_preserving_name(expr, &mut expr_rewriter))
             .collect::<Result<Vec<_>>>()?;
 
-        let inputs: Vec<LogicalPlan> = 
plan.inputs().into_iter().cloned().collect();
-        Ok(Some(plan.with_new_exprs(new_exprs, inputs.as_slice())?))
+        let inputs = plan.inputs().into_iter().cloned().collect();
+        plan.with_new_exprs(new_exprs, inputs).map(Some)
     }
 
     fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/utils.rs 
b/datafusion/optimizer/src/utils.rs
index 5671dc6ae9..6189f9a579 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -48,7 +48,8 @@ pub fn optimize_children(
         new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
     }
     if plan_is_changed {
-        Ok(Some(plan.with_new_exprs(plan.expressions(), &new_inputs)?))
+        let exprs = plan.expressions();
+        plan.with_new_exprs(exprs, new_inputs).map(Some)
     } else {
         Ok(None)
     }


Reply via email to