This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ee9736fc81 Remove some recursive clones (#9050)
ee9736fc81 is described below
commit ee9736fc81b4461d417b409baeaeb9f5e01ad962
Author: Mehmet Ozan Kabak <[email protected]>
AuthorDate: Tue Jan 30 14:49:17 2024 +0300
Remove some recursive clones (#9050)
---
datafusion/expr/src/logical_plan/builder.rs | 2 +-
datafusion/expr/src/logical_plan/plan.rs | 74 ++++++-------
datafusion/expr/src/tree_node/plan.rs | 6 +-
datafusion/expr/src/utils.rs | 2 +-
datafusion/optimizer/src/analyzer/rewrite_expr.rs | 2 +-
datafusion/optimizer/src/analyzer/type_coercion.rs | 6 +-
datafusion/optimizer/src/eliminate_outer_join.rs | 5 +-
datafusion/optimizer/src/optimize_projections.rs | 6 +-
datafusion/optimizer/src/optimizer.rs | 5 +-
datafusion/optimizer/src/push_down_filter.rs | 117 ++++++++-------------
datafusion/optimizer/src/push_down_limit.rs | 106 +++++++++----------
.../src/simplify_expressions/simplify_exprs.rs | 2 +-
.../optimizer/src/unwrap_cast_in_comparison.rs | 4 +-
datafusion/optimizer/src/utils.rs | 3 +-
14 files changed, 154 insertions(+), 186 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index eb5e5bd426..6ae5d37f9e 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -469,7 +469,7 @@ impl LogicalPlanBuilder {
)
})
.collect::<Result<Vec<_>>>()?;
- curr_plan.with_new_exprs(curr_plan.expressions(), &new_inputs)
+ curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
}
}
}
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index aee3a59dd2..b72dd7f5ec 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -565,7 +565,7 @@ impl LogicalPlan {
/// Returns a copy of this `LogicalPlan` with the new inputs
#[deprecated(since = "35.0.0", note = "please use `with_new_exprs`
instead")]
pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) ->
Result<LogicalPlan> {
- self.with_new_exprs(self.expressions(), inputs)
+ self.with_new_exprs(self.expressions(), inputs.to_vec())
}
/// Returns a new `LogicalPlan` based on `self` with inputs and
@@ -590,13 +590,13 @@ impl LogicalPlan {
pub fn with_new_exprs(
&self,
mut expr: Vec<Expr>,
- inputs: &[LogicalPlan],
+ mut inputs: Vec<LogicalPlan>,
) -> Result<LogicalPlan> {
match self {
// Since expr may be different than the previous expr, schema of
the projection
// may change. We need to use try_new method instead of
try_new_with_schema method.
LogicalPlan::Projection(Projection { .. }) => {
- Projection::try_new(expr, Arc::new(inputs[0].clone()))
+ Projection::try_new(expr, Arc::new(inputs.swap_remove(0)))
.map(LogicalPlan::Projection)
}
LogicalPlan::Dml(DmlStatement {
@@ -608,7 +608,7 @@ impl LogicalPlan {
table_name: table_name.clone(),
table_schema: table_schema.clone(),
op: op.clone(),
- input: Arc::new(inputs[0].clone()),
+ input: Arc::new(inputs.swap_remove(0)),
})),
LogicalPlan::Copy(CopyTo {
input: _,
@@ -617,7 +617,7 @@ impl LogicalPlan {
copy_options,
single_file_output,
}) => Ok(LogicalPlan::Copy(CopyTo {
- input: Arc::new(inputs[0].clone()),
+ input: Arc::new(inputs.swap_remove(0)),
output_url: output_url.clone(),
file_format: file_format.clone(),
single_file_output: *single_file_output,
@@ -629,7 +629,7 @@ impl LogicalPlan {
values: expr
.chunks_exact(schema.fields().len())
.map(|s| s.to_vec())
- .collect::<Vec<_>>(),
+ .collect(),
}))
}
LogicalPlan::Filter { .. } => {
@@ -674,7 +674,7 @@ impl LogicalPlan {
let mut remove_aliases = RemoveAliases {};
let predicate = predicate.rewrite(&mut remove_aliases)?;
- Filter::try_new(predicate, Arc::new(inputs[0].clone()))
+ Filter::try_new(predicate, Arc::new(inputs.swap_remove(0)))
.map(LogicalPlan::Filter)
}
LogicalPlan::Repartition(Repartition {
@@ -684,35 +684,35 @@ impl LogicalPlan {
Partitioning::RoundRobinBatch(n) => {
Ok(LogicalPlan::Repartition(Repartition {
partitioning_scheme: Partitioning::RoundRobinBatch(*n),
- input: Arc::new(inputs[0].clone()),
+ input: Arc::new(inputs.swap_remove(0)),
}))
}
Partitioning::Hash(_, n) =>
Ok(LogicalPlan::Repartition(Repartition {
partitioning_scheme: Partitioning::Hash(expr, *n),
- input: Arc::new(inputs[0].clone()),
+ input: Arc::new(inputs.swap_remove(0)),
})),
Partitioning::DistributeBy(_) => {
Ok(LogicalPlan::Repartition(Repartition {
partitioning_scheme: Partitioning::DistributeBy(expr),
- input: Arc::new(inputs[0].clone()),
+ input: Arc::new(inputs.swap_remove(0)),
}))
}
},
LogicalPlan::Window(Window { window_expr, .. }) => {
assert_eq!(window_expr.len(), expr.len());
- Window::try_new(expr, Arc::new(inputs[0].clone()))
+ Window::try_new(expr, Arc::new(inputs.swap_remove(0)))
.map(LogicalPlan::Window)
}
LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
// group exprs are the first expressions
let agg_expr = expr.split_off(group_expr.len());
- Aggregate::try_new(Arc::new(inputs[0].clone()), expr, agg_expr)
+ Aggregate::try_new(Arc::new(inputs.swap_remove(0)), expr,
agg_expr)
.map(LogicalPlan::Aggregate)
}
LogicalPlan::Sort(Sort { fetch, .. }) => Ok(LogicalPlan::Sort(Sort
{
expr,
- input: Arc::new(inputs[0].clone()),
+ input: Arc::new(inputs.swap_remove(0)),
fetch: *fetch,
})),
LogicalPlan::Join(Join {
@@ -739,7 +739,7 @@ impl LogicalPlan {
// The first part of expr is equi-exprs,
// and the struct of each equi-expr is like `left-expr =
right-expr`.
assert_eq!(expr.len(), equi_expr_count);
- let new_on:Vec<(Expr,Expr)> = expr.into_iter().map(|equi_expr|
{
+ let new_on = expr.into_iter().map(|equi_expr| {
// SimplifyExpression rule may add alias to the equi_expr.
let unalias_expr = equi_expr.clone().unalias();
if let Expr::BinaryExpr(BinaryExpr { left, op:
Operator::Eq, right }) = unalias_expr {
@@ -752,8 +752,8 @@ impl LogicalPlan {
}).collect::<Result<Vec<(Expr, Expr)>>>()?;
Ok(LogicalPlan::Join(Join {
- left: Arc::new(inputs[0].clone()),
- right: Arc::new(inputs[1].clone()),
+ left: Arc::new(inputs.swap_remove(0)),
+ right: Arc::new(inputs.swap_remove(0)),
join_type: *join_type,
join_constraint: *join_constraint,
on: new_on,
@@ -763,28 +763,28 @@ impl LogicalPlan {
}))
}
LogicalPlan::CrossJoin(_) => {
- let left = inputs[0].clone();
- let right = inputs[1].clone();
+ let left = inputs.swap_remove(0);
+ let right = inputs.swap_remove(0);
LogicalPlanBuilder::from(left).cross_join(right)?.build()
}
LogicalPlan::Subquery(Subquery {
outer_ref_columns, ..
}) => {
- let subquery =
LogicalPlanBuilder::from(inputs[0].clone()).build()?;
+ let subquery =
LogicalPlanBuilder::from(inputs.swap_remove(0)).build()?;
Ok(LogicalPlan::Subquery(Subquery {
subquery: Arc::new(subquery),
outer_ref_columns: outer_ref_columns.clone(),
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
- SubqueryAlias::try_new(Arc::new(inputs[0].clone()),
alias.clone())
+ SubqueryAlias::try_new(Arc::new(inputs.swap_remove(0)),
alias.clone())
.map(LogicalPlan::SubqueryAlias)
}
LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
Ok(LogicalPlan::Limit(Limit {
skip: *skip,
fetch: *fetch,
- input: Arc::new(inputs[0].clone()),
+ input: Arc::new(inputs.swap_remove(0)),
}))
}
LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable
{
@@ -795,7 +795,7 @@ impl LogicalPlan {
..
})) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
- input: Arc::new(inputs[0].clone()),
+ input: Arc::new(inputs.swap_remove(0)),
constraints: Constraints::empty(),
name: name.clone(),
if_not_exists: *if_not_exists,
@@ -809,30 +809,30 @@ impl LogicalPlan {
definition,
..
})) => Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
- input: Arc::new(inputs[0].clone()),
+ input: Arc::new(inputs.swap_remove(0)),
name: name.clone(),
or_replace: *or_replace,
definition: definition.clone(),
}))),
LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
- node: e.node.from_template(&expr, inputs),
+ node: e.node.from_template(&expr, &inputs),
})),
LogicalPlan::Union(Union { schema, .. }) => {
let input_schema = inputs[0].schema();
// If inputs are not pruned do not change schema.
let schema = if schema.fields().len() ==
input_schema.fields().len() {
- schema
+ schema.clone()
} else {
- input_schema
+ input_schema.clone()
};
Ok(LogicalPlan::Union(Union {
- inputs: inputs.iter().cloned().map(Arc::new).collect(),
- schema: schema.clone(),
+ inputs: inputs.into_iter().map(Arc::new).collect(),
+ schema,
}))
}
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
- Distinct::All(_) =>
Distinct::All(Arc::new(inputs[0].clone())),
+ Distinct::All(_) =>
Distinct::All(Arc::new(inputs.swap_remove(0))),
Distinct::On(DistinctOn {
on_expr,
select_expr,
@@ -848,7 +848,7 @@ impl LogicalPlan {
} else {
None
},
- Arc::new(inputs[0].clone()),
+ Arc::new(inputs.swap_remove(0)),
)?)
}
};
@@ -858,8 +858,8 @@ impl LogicalPlan {
name, is_distinct, ..
}) => Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
name: name.clone(),
- static_term: Arc::new(inputs[0].clone()),
- recursive_term: Arc::new(inputs[1].clone()),
+ static_term: Arc::new(inputs.swap_remove(0)),
+ recursive_term: Arc::new(inputs.swap_remove(0)),
is_distinct: *is_distinct,
})),
LogicalPlan::Analyze(a) => {
@@ -868,7 +868,7 @@ impl LogicalPlan {
Ok(LogicalPlan::Analyze(Analyze {
verbose: a.verbose,
schema: a.schema.clone(),
- input: Arc::new(inputs[0].clone()),
+ input: Arc::new(inputs.swap_remove(0)),
}))
}
LogicalPlan::Explain(e) => {
@@ -879,7 +879,7 @@ impl LogicalPlan {
assert_eq!(inputs.len(), 1, "Invalid EXPLAIN command. Inputs
are empty");
Ok(LogicalPlan::Explain(Explain {
verbose: e.verbose,
- plan: Arc::new(inputs[0].clone()),
+ plan: Arc::new(inputs.swap_remove(0)),
stringified_plans: e.stringified_plans.clone(),
schema: e.schema.clone(),
logical_optimization_succeeded:
e.logical_optimization_succeeded,
@@ -890,7 +890,7 @@ impl LogicalPlan {
}) => Ok(LogicalPlan::Prepare(Prepare {
name: name.clone(),
data_types: data_types.clone(),
- input: Arc::new(inputs[0].clone()),
+ input: Arc::new(inputs.swap_remove(0)),
})),
LogicalPlan::TableScan(ts) => {
assert!(inputs.is_empty(), "{self:?} should have no inputs");
@@ -915,7 +915,7 @@ impl LogicalPlan {
..
}) => {
// Update schema with unnested column type.
- let input = Arc::new(inputs[0].clone());
+ let input = Arc::new(inputs.swap_remove(0));
let nested_field = input.schema().field_from_column(column)?;
let unnested_field = schema.field_from_column(column)?;
let fields = input
@@ -1199,7 +1199,7 @@ impl LogicalPlan {
.map(|inp| inp.replace_params_with_values(param_values))
.collect::<Result<Vec<_>>>()?;
- self.with_new_exprs(new_exprs, &new_inputs_with_values)
+ self.with_new_exprs(new_exprs, new_inputs_with_values)
}
/// Walk the logical plan, find any `Placeholder` tokens, and return a map
of their IDs and DataTypes
diff --git a/datafusion/expr/src/tree_node/plan.rs
b/datafusion/expr/src/tree_node/plan.rs
index 589bb917a9..c35a09874a 100644
--- a/datafusion/expr/src/tree_node/plan.rs
+++ b/datafusion/expr/src/tree_node/plan.rs
@@ -89,11 +89,11 @@ impl TreeNode for LogicalPlan {
// if any changes made, make a new child
if old_children
- .iter()
+ .into_iter()
.zip(new_children.iter())
- .any(|(c1, c2)| c1 != &c2)
+ .any(|(c1, c2)| c1 != c2)
{
- self.with_new_exprs(self.expressions(), new_children.as_slice())
+ self.with_new_exprs(self.expressions(), new_children)
} else {
Ok(self)
}
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 02479c0765..5d011e097f 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -722,7 +722,7 @@ pub fn from_plan(
expr: &[Expr],
inputs: &[LogicalPlan],
) -> Result<LogicalPlan> {
- plan.with_new_exprs(expr.to_vec(), inputs)
+ plan.with_new_exprs(expr.to_vec(), inputs.to_vec())
}
/// Find all columns referenced from an aggregate query
diff --git a/datafusion/optimizer/src/analyzer/rewrite_expr.rs
b/datafusion/optimizer/src/analyzer/rewrite_expr.rs
index 8f1c844ed0..eedfc40a7f 100644
--- a/datafusion/optimizer/src/analyzer/rewrite_expr.rs
+++ b/datafusion/optimizer/src/analyzer/rewrite_expr.rs
@@ -86,7 +86,7 @@ fn analyze_internal(plan: &LogicalPlan) ->
Result<LogicalPlan> {
})
.collect::<Result<Vec<_>>>()?;
- plan.with_new_exprs(new_expr, &new_inputs)
+ plan.with_new_exprs(new_expr, new_inputs)
}
pub(crate) struct OperatorToFunctionRewriter {
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index c9ecb2a770..8710249e12 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -77,7 +77,7 @@ fn analyze_internal(
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
// optimize child plans first
- let new_inputs = plan
+ let mut new_inputs = plan
.inputs()
.iter()
.map(|p| analyze_internal(external_schema, p))
@@ -115,9 +115,9 @@ fn analyze_internal(
match &plan {
LogicalPlan::Projection(_) =>
Ok(LogicalPlan::Projection(Projection::try_new(
new_expr,
- Arc::new(new_inputs[0].clone()),
+ Arc::new(new_inputs.swap_remove(0)),
)?)),
- _ => plan.with_new_exprs(new_expr, &new_inputs),
+ _ => plan.with_new_exprs(new_expr, new_inputs),
}
}
diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs
b/datafusion/optimizer/src/eliminate_outer_join.rs
index 53c4b3702b..56a4a76987 100644
--- a/datafusion/optimizer/src/eliminate_outer_join.rs
+++ b/datafusion/optimizer/src/eliminate_outer_join.rs
@@ -106,9 +106,8 @@ impl OptimizerRule for EliminateOuterJoin {
schema: join.schema.clone(),
null_equals_null: join.null_equals_null,
});
- let new_plan =
- plan.with_new_exprs(plan.expressions(), &[new_join])?;
- Ok(Some(new_plan))
+ let exprs = plan.expressions();
+ plan.with_new_exprs(exprs, vec![new_join]).map(Some)
}
_ => Ok(None),
},
diff --git a/datafusion/optimizer/src/optimize_projections.rs
b/datafusion/optimizer/src/optimize_projections.rs
index 1035995642..db0459fd9a 100644
--- a/datafusion/optimizer/src/optimize_projections.rs
+++ b/datafusion/optimizer/src/optimize_projections.rs
@@ -375,9 +375,9 @@ fn optimize_projections(
// If new_input is `None`, this means child is not changed, so use
// `old_child` during construction:
.map(|(new_input, old_child)| new_input.unwrap_or_else(||
old_child.clone()))
- .collect::<Vec<_>>();
- plan.with_new_exprs(plan.expressions(), &new_inputs)
- .map(Some)
+ .collect();
+ let exprs = plan.expressions();
+ plan.with_new_exprs(exprs, new_inputs).map(Some)
}
}
diff --git a/datafusion/optimizer/src/optimizer.rs
b/datafusion/optimizer/src/optimizer.rs
index a192348f69..633a32996d 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -380,9 +380,10 @@ impl Optimizer {
Some(plan) => plan,
None => old_plan.clone(),
})
- .collect::<Vec<_>>();
+ .collect();
- Ok(Some(plan.with_new_exprs(plan.expressions(), &new_inputs)?))
+ let exprs = plan.expressions();
+ plan.with_new_exprs(exprs, new_inputs).map(Some)
}
/// Use a rule to optimize the whole plan.
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index 7086c5cda5..fc56cbb868 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -297,15 +297,10 @@ fn can_evaluate_as_join_condition(predicate: &Expr) ->
Result<bool> {
//
// do nothing.
//
-fn extract_or_clauses_for_join(
- filters: &[&Expr],
- schema: &DFSchema,
- preserved: bool,
-) -> Vec<Expr> {
- if !preserved {
- return vec![];
- }
-
+fn extract_or_clauses_for_join<'a>(
+ filters: &'a [Expr],
+ schema: &'a DFSchema,
+) -> impl Iterator<Item = Expr> + 'a {
let schema_columns = schema
.fields()
.iter()
@@ -318,8 +313,8 @@ fn extract_or_clauses_for_join(
})
.collect::<HashSet<_>>();
- let mut exprs = vec![];
- for expr in filters.iter() {
+ // new formed OR clauses and their column references
+ filters.iter().filter_map(move |expr| {
if let Expr::BinaryExpr(BinaryExpr {
left,
op: Operator::Or,
@@ -331,13 +326,11 @@ fn extract_or_clauses_for_join(
// If nothing can be extracted from any sub clauses, do nothing
for this OR clause.
if let (Some(left_expr), Some(right_expr)) = (left_expr,
right_expr) {
- exprs.push(or(left_expr, right_expr));
+ return Some(or(left_expr, right_expr));
}
}
- }
-
- // new formed OR clauses and their column references
- exprs
+ None
+ })
}
// extract qual from OR sub-clause.
@@ -425,15 +418,17 @@ fn push_down_all_join(
// 1) can push through join to its children(left or right)
// 2) can be converted to join conditions if the join type is Inner
// 3) should be kept as filter conditions
+ let left_schema = left.schema();
+ let right_schema = right.schema();
let mut left_push = vec![];
let mut right_push = vec![];
let mut keep_predicates = vec![];
let mut join_conditions = vec![];
for predicate in predicates {
- if left_preserved && can_pushdown_join_predicate(&predicate,
left.schema())? {
+ if left_preserved && can_pushdown_join_predicate(&predicate,
left_schema)? {
left_push.push(predicate);
} else if right_preserved
- && can_pushdown_join_predicate(&predicate, right.schema())?
+ && can_pushdown_join_predicate(&predicate, right_schema)?
{
right_push.push(predicate);
} else if is_inner_join && can_evaluate_as_join_condition(&predicate)?
{
@@ -447,10 +442,10 @@ fn push_down_all_join(
// For infer predicates, if they can not push through join, just drop them
for predicate in infer_predicates {
- if left_preserved && can_pushdown_join_predicate(&predicate,
left.schema())? {
+ if left_preserved && can_pushdown_join_predicate(&predicate,
left_schema)? {
left_push.push(predicate);
} else if right_preserved
- && can_pushdown_join_predicate(&predicate, right.schema())?
+ && can_pushdown_join_predicate(&predicate, right_schema)?
{
right_push.push(predicate);
}
@@ -459,10 +454,10 @@ fn push_down_all_join(
if !on_filter.is_empty() {
let (on_left_preserved, on_right_preserved) =
on_lr_is_preserved(join_plan)?;
for on in on_filter {
- if on_left_preserved && can_pushdown_join_predicate(&on,
left.schema())? {
+ if on_left_preserved && can_pushdown_join_predicate(&on,
left_schema)? {
left_push.push(on)
} else if on_right_preserved
- && can_pushdown_join_predicate(&on, right.schema())?
+ && can_pushdown_join_predicate(&on, right_schema)?
{
right_push.push(on)
} else {
@@ -473,31 +468,14 @@ fn push_down_all_join(
// Extract from OR clause, generate new predicates for both side of join
if possible.
// We only track the unpushable predicates above.
- let or_to_left = extract_or_clauses_for_join(
- &keep_predicates.iter().collect::<Vec<_>>(),
- left.schema(),
- left_preserved,
- );
- let or_to_right = extract_or_clauses_for_join(
- &keep_predicates.iter().collect::<Vec<_>>(),
- right.schema(),
- right_preserved,
- );
- let on_or_to_left = extract_or_clauses_for_join(
- &join_conditions.iter().collect::<Vec<_>>(),
- left.schema(),
- left_preserved,
- );
- let on_or_to_right = extract_or_clauses_for_join(
- &join_conditions.iter().collect::<Vec<_>>(),
- right.schema(),
- right_preserved,
- );
-
- left_push.extend(or_to_left);
- left_push.extend(on_or_to_left);
- right_push.extend(or_to_right);
- right_push.extend(on_or_to_right);
+ if left_preserved {
+ left_push.extend(extract_or_clauses_for_join(&keep_predicates,
left_schema));
+ left_push.extend(extract_or_clauses_for_join(&join_conditions,
left_schema));
+ }
+ if right_preserved {
+ right_push.extend(extract_or_clauses_for_join(&keep_predicates,
right_schema));
+ right_push.extend(extract_or_clauses_for_join(&join_conditions,
right_schema));
+ }
let left = match conjunction(left_push) {
Some(predicate) => {
@@ -519,28 +497,19 @@ fn push_down_all_join(
// it always will be the last element, otherwise result
// vector will contain only join keys (without additional
// element representing filter).
- let expr = join_plan.expressions();
- let mut new_exprs = if !on_filter_empty {
- expr[..expr.len() - 1].to_vec()
- } else {
- expr
- };
- if !join_conditions.is_empty() {
- new_exprs.push(join_conditions.into_iter().reduce(Expr::and).unwrap());
+ let mut exprs = join_plan.expressions();
+ if !on_filter_empty {
+ exprs.pop();
}
- let plan = join_plan.with_new_exprs(new_exprs, &[left, right])?;
+ exprs.extend(join_conditions.into_iter().reduce(Expr::and));
+ let plan = join_plan.with_new_exprs(exprs, vec![left, right])?;
- if keep_predicates.is_empty() {
- Ok(plan)
- } else {
- // wrap the join on the filter whose predicates must be kept
- match conjunction(keep_predicates) {
- Some(predicate) => Ok(LogicalPlan::Filter(Filter::try_new(
- predicate,
- Arc::new(plan),
- )?)),
- None => Ok(plan),
+ // wrap the join on the filter whose predicates must be kept
+ match conjunction(keep_predicates) {
+ Some(predicate) => {
+ Filter::try_new(predicate, Arc::new(plan)).map(LogicalPlan::Filter)
}
+ None => Ok(plan),
}
}
@@ -694,9 +663,9 @@ impl OptimizerRule for PushDownFilter {
// commutable
let new_filter = plan.with_new_exprs(
plan.expressions(),
- &[child_plan.inputs()[0].clone()],
+ vec![child_plan.inputs()[0].clone()],
)?;
- child_plan.with_new_exprs(child_plan.expressions(),
&[new_filter])?
+ child_plan.with_new_exprs(child_plan.expressions(),
vec![new_filter])?
}
LogicalPlan::SubqueryAlias(subquery_alias) => {
let mut replace_map = HashMap::new();
@@ -719,7 +688,7 @@ impl OptimizerRule for PushDownFilter {
new_predicate,
subquery_alias.input.clone(),
)?);
- child_plan.with_new_exprs(child_plan.expressions(),
&[new_filter])?
+ child_plan.with_new_exprs(child_plan.expressions(),
vec![new_filter])?
}
LogicalPlan::Projection(projection) => {
// A projection is filter-commutable if it do not contain
volatile predicates or contain volatile
@@ -767,12 +736,12 @@ impl OptimizerRule for PushDownFilter {
match conjunction(keep_predicates) {
None => child_plan.with_new_exprs(
child_plan.expressions(),
- &[new_filter],
+ vec![new_filter],
)?,
Some(keep_predicate) => {
let child_plan = child_plan.with_new_exprs(
child_plan.expressions(),
- &[new_filter],
+ vec![new_filter],
)?;
LogicalPlan::Filter(Filter::try_new(
keep_predicate,
@@ -843,13 +812,13 @@ impl OptimizerRule for PushDownFilter {
let child = match conjunction(replaced_push_predicates) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
predicate,
- Arc::new((*agg.input).clone()),
+ agg.input.clone(),
)?),
None => (*agg.input).clone(),
};
let new_agg = filter
.input
- .with_new_exprs(filter.input.expressions(), &vec![child])?;
+ .with_new_exprs(filter.input.expressions(), vec![child])?;
match conjunction(keep_predicates) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
predicate,
@@ -955,7 +924,7 @@ impl OptimizerRule for PushDownFilter {
};
// extension with new inputs.
let new_extension =
- child_plan.with_new_exprs(child_plan.expressions(),
&new_children)?;
+ child_plan.with_new_exprs(child_plan.expressions(),
new_children)?;
match conjunction(keep_predicates) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
diff --git a/datafusion/optimizer/src/push_down_limit.rs
b/datafusion/optimizer/src/push_down_limit.rs
index c2f35a7906..33d02d5c56 100644
--- a/datafusion/optimizer/src/push_down_limit.rs
+++ b/datafusion/optimizer/src/push_down_limit.rs
@@ -17,14 +17,17 @@
//! Optimizer rule to push down LIMIT in the query plan
//! It will push down through projection, limits (taking the smaller limit)
+
+use std::sync::Arc;
+
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
+
use datafusion_common::Result;
-use datafusion_expr::{
- logical_plan::{Join, JoinType, Limit, LogicalPlan, Sort, TableScan, Union},
- CrossJoin,
+use datafusion_expr::logical_plan::{
+ Join, JoinType, Limit, LogicalPlan, Sort, TableScan, Union,
};
-use std::sync::Arc;
+use datafusion_expr::CrossJoin;
/// Optimization rule that tries to push down LIMIT.
#[derive(Default)]
@@ -46,9 +49,8 @@ impl OptimizerRule for PushDownLimit {
) -> Result<Option<LogicalPlan>> {
use std::cmp::min;
- let limit = match plan {
- LogicalPlan::Limit(limit) => limit,
- _ => return Ok(None),
+ let LogicalPlan::Limit(limit) = plan else {
+ return Ok(None);
};
if let LogicalPlan::Limit(child) = &*limit.input {
@@ -96,27 +98,22 @@ impl OptimizerRule for PushDownLimit {
fetch: new_fetch,
input: Arc::new((*child.input).clone()),
});
- return {
- match self.try_optimize(&plan, _config)? {
- Some(new_plan) => Ok(Some(new_plan)),
- None => Ok(Some(plan)),
- }
- };
+ return self
+ .try_optimize(&plan, _config)
+ .map(|opt_plan| opt_plan.or_else(|| Some(plan)));
}
- let fetch = match limit.fetch {
- Some(fetch) => fetch,
- None => return Ok(None),
+ let Some(fetch) = limit.fetch else {
+ return Ok(None);
};
let skip = limit.skip;
- let child_plan = &*limit.input;
- let plan = match child_plan {
+ match limit.input.as_ref() {
LogicalPlan::TableScan(scan) => {
let limit = if fetch != 0 { fetch + skip } else { 0 };
let new_fetch = scan.fetch.map(|x| min(x,
limit)).or(Some(limit));
if new_fetch == scan.fetch {
- None
+ Ok(None)
} else {
let new_input = LogicalPlan::TableScan(TableScan {
table_name: scan.table_name.clone(),
@@ -126,7 +123,8 @@ impl OptimizerRule for PushDownLimit {
fetch: scan.fetch.map(|x| min(x,
limit)).or(Some(limit)),
projected_schema: scan.projected_schema.clone(),
});
- Some(plan.with_new_exprs(plan.expressions(),
&[new_input])?)
+ plan.with_new_exprs(plan.expressions(), vec![new_input])
+ .map(Some)
}
}
LogicalPlan::Union(union) => {
@@ -137,7 +135,7 @@ impl OptimizerRule for PushDownLimit {
Ok(Arc::new(LogicalPlan::Limit(Limit {
skip: 0,
fetch: Some(fetch + skip),
- input: Arc::new((**x).clone()),
+ input: x.clone(),
})))
})
.collect::<Result<_>>()?;
@@ -145,38 +143,36 @@ impl OptimizerRule for PushDownLimit {
inputs: new_inputs,
schema: union.schema.clone(),
});
- Some(plan.with_new_exprs(plan.expressions(), &[union])?)
+ plan.with_new_exprs(plan.expressions(), vec![union])
+ .map(Some)
}
LogicalPlan::CrossJoin(cross_join) => {
- let left = &*cross_join.left;
- let right = &*cross_join.right;
let new_left = LogicalPlan::Limit(Limit {
skip: 0,
fetch: Some(fetch + skip),
- input: Arc::new(left.clone()),
+ input: cross_join.left.clone(),
});
let new_right = LogicalPlan::Limit(Limit {
skip: 0,
fetch: Some(fetch + skip),
- input: Arc::new(right.clone()),
+ input: cross_join.right.clone(),
});
let new_cross_join = LogicalPlan::CrossJoin(CrossJoin {
left: Arc::new(new_left),
right: Arc::new(new_right),
schema: plan.schema().clone(),
});
- Some(plan.with_new_exprs(plan.expressions(),
&[new_cross_join])?)
+ plan.with_new_exprs(plan.expressions(), vec![new_cross_join])
+ .map(Some)
}
LogicalPlan::Join(join) => {
- let new_join = push_down_join(join, fetch + skip);
- match new_join {
- Some(new_join) => Some(plan.with_new_exprs(
- plan.expressions(),
- &[LogicalPlan::Join(new_join)],
- )?),
- None => None,
+ if let Some(new_join) = push_down_join(join, fetch + skip) {
+ let inputs = vec![LogicalPlan::Join(new_join)];
+ plan.with_new_exprs(plan.expressions(), inputs).map(Some)
+ } else {
+ Ok(None)
}
}
@@ -186,28 +182,29 @@ impl OptimizerRule for PushDownLimit {
Some(sort.fetch.map(|f|
f.min(sort_fetch)).unwrap_or(sort_fetch))
};
if new_fetch == sort.fetch {
- None
+ Ok(None)
} else {
let new_sort = LogicalPlan::Sort(Sort {
expr: sort.expr.clone(),
- input: Arc::new((*sort.input).clone()),
+ input: sort.input.clone(),
fetch: new_fetch,
});
- Some(plan.with_new_exprs(plan.expressions(), &[new_sort])?)
+ plan.with_new_exprs(plan.expressions(), vec![new_sort])
+ .map(Some)
}
}
- LogicalPlan::Projection(_) | LogicalPlan::SubqueryAlias(_) => {
+ child_plan @ (LogicalPlan::Projection(_) |
LogicalPlan::SubqueryAlias(_)) => {
// commute
let new_limit = plan.with_new_exprs(
plan.expressions(),
- &[child_plan.inputs()[0].clone()],
+ vec![child_plan.inputs()[0].clone()],
)?;
- Some(child_plan.with_new_exprs(child_plan.expressions(),
&[new_limit])?)
+ child_plan
+ .with_new_exprs(child_plan.expressions(), vec![new_limit])
+ .map(Some)
}
- _ => None,
- };
-
- Ok(plan)
+ _ => Ok(None),
+ }
}
fn name(&self) -> &str {
@@ -245,24 +242,24 @@ fn push_down_join(join: &Join, limit: usize) ->
Option<Join> {
(None, None) => None,
_ => {
let left = match left_limit {
- Some(limit) => LogicalPlan::Limit(Limit {
+ Some(limit) => Arc::new(LogicalPlan::Limit(Limit {
skip: 0,
fetch: Some(limit),
- input: Arc::new((*join.left).clone()),
- }),
- None => (*join.left).clone(),
+ input: join.left.clone(),
+ })),
+ None => join.left.clone(),
};
let right = match right_limit {
- Some(limit) => LogicalPlan::Limit(Limit {
+ Some(limit) => Arc::new(LogicalPlan::Limit(Limit {
skip: 0,
fetch: Some(limit),
- input: Arc::new((*join.right).clone()),
- }),
- None => (*join.right).clone(),
+ input: join.right.clone(),
+ })),
+ None => join.right.clone(),
};
Some(Join {
- left: Arc::new(left),
- right: Arc::new(right),
+ left,
+ right,
on: join.on.clone(),
filter: join.filter.clone(),
join_type: join.join_type,
@@ -280,6 +277,7 @@ mod test {
use super::*;
use crate::test::*;
+
use datafusion_expr::{
col, exists,
logical_plan::{builder::LogicalPlanBuilder, JoinType, LogicalPlan},
diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index 7265b17dd0..d68474dcde 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -122,7 +122,7 @@ impl SimplifyExpressions {
}
};
- plan.with_new_exprs(expr, &new_inputs)
+ plan.with_new_exprs(expr, new_inputs)
}
}
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index 91603e82a5..9d3561d126 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -109,8 +109,8 @@ impl OptimizerRule for UnwrapCastInComparison {
.map(|expr| rewrite_preserving_name(expr, &mut expr_rewriter))
.collect::<Result<Vec<_>>>()?;
- let inputs: Vec<LogicalPlan> =
plan.inputs().into_iter().cloned().collect();
- Ok(Some(plan.with_new_exprs(new_exprs, inputs.as_slice())?))
+ let inputs = plan.inputs().into_iter().cloned().collect();
+ plan.with_new_exprs(new_exprs, inputs).map(Some)
}
fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/utils.rs
b/datafusion/optimizer/src/utils.rs
index 5671dc6ae9..6189f9a579 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -48,7 +48,8 @@ pub fn optimize_children(
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
}
if plan_is_changed {
- Ok(Some(plan.with_new_exprs(plan.expressions(), &new_inputs)?))
+ let exprs = plan.expressions();
+ plan.with_new_exprs(exprs, new_inputs).map(Some)
} else {
Ok(None)
}