This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bf0a39a791 Deprecate duplicate function `LogicalPlan::with_new_inputs`
(#8707)
bf0a39a791 is described below
commit bf0a39a791e7cd0e965abb8c87950cc4101149f7
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Tue Jan 2 00:28:36 2024 -0800
Deprecate duplicate function `LogicalPlan::with_new_inputs` (#8707)
* Remove duplicate function with_new_inputs
* Make it as deprecated function
---
datafusion/expr/src/logical_plan/builder.rs | 2 +-
datafusion/expr/src/logical_plan/plan.rs | 47 +++---------------------
datafusion/expr/src/tree_node/plan.rs | 2 +-
datafusion/optimizer/src/eliminate_outer_join.rs | 3 +-
datafusion/optimizer/src/optimize_projections.rs | 3 +-
datafusion/optimizer/src/optimizer.rs | 2 +-
datafusion/optimizer/src/push_down_filter.rs | 28 +++++++++-----
datafusion/optimizer/src/push_down_limit.rs | 23 +++++++-----
datafusion/optimizer/src/utils.rs | 2 +-
9 files changed, 45 insertions(+), 67 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 549c25f89b..cfc052cfc1 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -445,7 +445,7 @@ impl LogicalPlanBuilder {
)
})
.collect::<Result<Vec<_>>>()?;
- curr_plan.with_new_inputs(&new_inputs)
+ curr_plan.with_new_exprs(curr_plan.expressions(), &new_inputs)
}
}
}
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 9b0f441ef9..c0c520c4e2 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -541,35 +541,9 @@ impl LogicalPlan {
}
/// Returns a copy of this `LogicalPlan` with the new inputs
+ #[deprecated(since = "35.0.0", note = "please use `with_new_exprs`
instead")]
pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) ->
Result<LogicalPlan> {
- // with_new_inputs use original expression,
- // so we don't need to recompute Schema.
- match &self {
- LogicalPlan::Projection(projection) => {
- // Schema of the projection may change
- // when its input changes. Hence we should use
- // `try_new` method instead of `try_new_with_schema`.
- Projection::try_new(projection.expr.to_vec(),
Arc::new(inputs[0].clone()))
- .map(LogicalPlan::Projection)
- }
- LogicalPlan::Window(Window { window_expr, .. }) =>
Ok(LogicalPlan::Window(
- Window::try_new(window_expr.to_vec(),
Arc::new(inputs[0].clone()))?,
- )),
- LogicalPlan::Aggregate(Aggregate {
- group_expr,
- aggr_expr,
- ..
- }) => Aggregate::try_new(
- // Schema of the aggregate may change
- // when its input changes. Hence we should use
- // `try_new` method instead of `try_new_with_schema`.
- Arc::new(inputs[0].clone()),
- group_expr.to_vec(),
- aggr_expr.to_vec(),
- )
- .map(LogicalPlan::Aggregate),
- _ => self.with_new_exprs(self.expressions(), inputs),
- }
+ self.with_new_exprs(self.expressions(), inputs)
}
/// Returns a new `LogicalPlan` based on `self` with inputs and
@@ -591,10 +565,6 @@ impl LogicalPlan {
/// // create new plan using rewritten_exprs in same position
/// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
/// ```
- ///
- /// Note: sometimes [`Self::with_new_exprs`] will use schema of
- /// original plan, it will not change the scheam. Such as
- /// `Projection/Aggregate/Window`
pub fn with_new_exprs(
&self,
mut expr: Vec<Expr>,
@@ -706,17 +676,10 @@ impl LogicalPlan {
}))
}
},
- LogicalPlan::Window(Window {
- window_expr,
- schema,
- ..
- }) => {
+ LogicalPlan::Window(Window { window_expr, .. }) => {
assert_eq!(window_expr.len(), expr.len());
- Ok(LogicalPlan::Window(Window {
- input: Arc::new(inputs[0].clone()),
- window_expr: expr,
- schema: schema.clone(),
- }))
+ Window::try_new(expr, Arc::new(inputs[0].clone()))
+ .map(LogicalPlan::Window)
}
LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
// group exprs are the first expressions
diff --git a/datafusion/expr/src/tree_node/plan.rs
b/datafusion/expr/src/tree_node/plan.rs
index 217116530d..208a8b57d7 100644
--- a/datafusion/expr/src/tree_node/plan.rs
+++ b/datafusion/expr/src/tree_node/plan.rs
@@ -113,7 +113,7 @@ impl TreeNode for LogicalPlan {
.zip(new_children.iter())
.any(|(c1, c2)| c1 != &c2)
{
- self.with_new_inputs(new_children.as_slice())
+ self.with_new_exprs(self.expressions(), new_children.as_slice())
} else {
Ok(self)
}
diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs
b/datafusion/optimizer/src/eliminate_outer_join.rs
index e4d57f0209..53c4b3702b 100644
--- a/datafusion/optimizer/src/eliminate_outer_join.rs
+++ b/datafusion/optimizer/src/eliminate_outer_join.rs
@@ -106,7 +106,8 @@ impl OptimizerRule for EliminateOuterJoin {
schema: join.schema.clone(),
null_equals_null: join.null_equals_null,
});
- let new_plan = plan.with_new_inputs(&[new_join])?;
+ let new_plan =
+ plan.with_new_exprs(plan.expressions(), &[new_join])?;
Ok(Some(new_plan))
}
_ => Ok(None),
diff --git a/datafusion/optimizer/src/optimize_projections.rs
b/datafusion/optimizer/src/optimize_projections.rs
index 7ae9f7edf5..891a909a33 100644
--- a/datafusion/optimizer/src/optimize_projections.rs
+++ b/datafusion/optimizer/src/optimize_projections.rs
@@ -373,7 +373,8 @@ fn optimize_projections(
// `old_child` during construction:
.map(|(new_input, old_child)| new_input.unwrap_or_else(||
old_child.clone()))
.collect::<Vec<_>>();
- plan.with_new_inputs(&new_inputs).map(Some)
+ plan.with_new_exprs(plan.expressions(), &new_inputs)
+ .map(Some)
}
}
diff --git a/datafusion/optimizer/src/optimizer.rs
b/datafusion/optimizer/src/optimizer.rs
index 0dc34cb809..2cb59d511c 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -382,7 +382,7 @@ impl Optimizer {
})
.collect::<Vec<_>>();
- Ok(Some(plan.with_new_inputs(&new_inputs)?))
+ Ok(Some(plan.with_new_exprs(plan.expressions(), &new_inputs)?))
}
/// Use a rule to optimize the whole plan.
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index 9d277d18d2..4eb925ac06 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -691,9 +691,11 @@ impl OptimizerRule for PushDownFilter {
| LogicalPlan::Distinct(_)
| LogicalPlan::Sort(_) => {
// commutable
- let new_filter =
- plan.with_new_inputs(&[child_plan.inputs()[0].clone()])?;
- child_plan.with_new_inputs(&[new_filter])?
+ let new_filter = plan.with_new_exprs(
+ plan.expressions(),
+ &[child_plan.inputs()[0].clone()],
+ )?;
+ child_plan.with_new_exprs(child_plan.expressions(),
&[new_filter])?
}
LogicalPlan::SubqueryAlias(subquery_alias) => {
let mut replace_map = HashMap::new();
@@ -716,7 +718,7 @@ impl OptimizerRule for PushDownFilter {
new_predicate,
subquery_alias.input.clone(),
)?);
- child_plan.with_new_inputs(&[new_filter])?
+ child_plan.with_new_exprs(child_plan.expressions(),
&[new_filter])?
}
LogicalPlan::Projection(projection) => {
// A projection is filter-commutable if it do not contain
volatile predicates or contain volatile
@@ -760,10 +762,15 @@ impl OptimizerRule for PushDownFilter {
)?);
match conjunction(keep_predicates) {
- None => child_plan.with_new_inputs(&[new_filter])?,
+ None => child_plan.with_new_exprs(
+ child_plan.expressions(),
+ &[new_filter],
+ )?,
Some(keep_predicate) => {
- let child_plan =
- child_plan.with_new_inputs(&[new_filter])?;
+ let child_plan = child_plan.with_new_exprs(
+ child_plan.expressions(),
+ &[new_filter],
+ )?;
LogicalPlan::Filter(Filter::try_new(
keep_predicate,
Arc::new(child_plan),
@@ -837,7 +844,9 @@ impl OptimizerRule for PushDownFilter {
)?),
None => (*agg.input).clone(),
};
- let new_agg = filter.input.with_new_inputs(&vec![child])?;
+ let new_agg = filter
+ .input
+ .with_new_exprs(filter.input.expressions(), &vec![child])?;
match conjunction(keep_predicates) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
predicate,
@@ -942,7 +951,8 @@ impl OptimizerRule for PushDownFilter {
None =>
extension_plan.node.inputs().into_iter().cloned().collect(),
};
// extension with new inputs.
- let new_extension = child_plan.with_new_inputs(&new_children)?;
+ let new_extension =
+ child_plan.with_new_exprs(child_plan.expressions(),
&new_children)?;
match conjunction(keep_predicates) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
diff --git a/datafusion/optimizer/src/push_down_limit.rs
b/datafusion/optimizer/src/push_down_limit.rs
index 6703a1d787..c2f35a7906 100644
--- a/datafusion/optimizer/src/push_down_limit.rs
+++ b/datafusion/optimizer/src/push_down_limit.rs
@@ -126,7 +126,7 @@ impl OptimizerRule for PushDownLimit {
fetch: scan.fetch.map(|x| min(x,
limit)).or(Some(limit)),
projected_schema: scan.projected_schema.clone(),
});
- Some(plan.with_new_inputs(&[new_input])?)
+ Some(plan.with_new_exprs(plan.expressions(),
&[new_input])?)
}
}
LogicalPlan::Union(union) => {
@@ -145,7 +145,7 @@ impl OptimizerRule for PushDownLimit {
inputs: new_inputs,
schema: union.schema.clone(),
});
- Some(plan.with_new_inputs(&[union])?)
+ Some(plan.with_new_exprs(plan.expressions(), &[union])?)
}
LogicalPlan::CrossJoin(cross_join) => {
@@ -166,15 +166,16 @@ impl OptimizerRule for PushDownLimit {
right: Arc::new(new_right),
schema: plan.schema().clone(),
});
- Some(plan.with_new_inputs(&[new_cross_join])?)
+ Some(plan.with_new_exprs(plan.expressions(),
&[new_cross_join])?)
}
LogicalPlan::Join(join) => {
let new_join = push_down_join(join, fetch + skip);
match new_join {
- Some(new_join) => {
-
Some(plan.with_new_inputs(&[LogicalPlan::Join(new_join)])?)
- }
+ Some(new_join) => Some(plan.with_new_exprs(
+ plan.expressions(),
+ &[LogicalPlan::Join(new_join)],
+ )?),
None => None,
}
}
@@ -192,14 +193,16 @@ impl OptimizerRule for PushDownLimit {
input: Arc::new((*sort.input).clone()),
fetch: new_fetch,
});
- Some(plan.with_new_inputs(&[new_sort])?)
+ Some(plan.with_new_exprs(plan.expressions(), &[new_sort])?)
}
}
LogicalPlan::Projection(_) | LogicalPlan::SubqueryAlias(_) => {
// commute
- let new_limit =
- plan.with_new_inputs(&[child_plan.inputs()[0].clone()])?;
- Some(child_plan.with_new_inputs(&[new_limit])?)
+ let new_limit = plan.with_new_exprs(
+ plan.expressions(),
+ &[child_plan.inputs()[0].clone()],
+ )?;
+ Some(child_plan.with_new_exprs(child_plan.expressions(),
&[new_limit])?)
}
_ => None,
};
diff --git a/datafusion/optimizer/src/utils.rs
b/datafusion/optimizer/src/utils.rs
index 48f72ee7a0..44f2404afa 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -46,7 +46,7 @@ pub fn optimize_children(
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
}
if plan_is_changed {
- Ok(Some(plan.with_new_inputs(&new_inputs)?))
+ Ok(Some(plan.with_new_exprs(plan.expressions(), &new_inputs)?))
} else {
Ok(None)
}