This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2def10ff1a Stop copying plans in `LogicalPlan::with_param_values`
(#10016)
2def10ff1a is described below
commit 2def10ff1a1ad6b99a1ae40692115b28c3efdfc2
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Apr 12 07:21:02 2024 -0400
Stop copying plans in `LogicalPlan::with_param_values` (#10016)
---
datafusion/expr/src/logical_plan/plan.rs | 79 +++++++++++---------------------
1 file changed, 28 insertions(+), 51 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index cb8c97c71e..91c8670f38 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -913,14 +913,19 @@ impl LogicalPlan {
param_values: impl Into<ParamValues>,
) -> Result<LogicalPlan> {
let param_values = param_values.into();
- match self {
- LogicalPlan::Prepare(prepare_lp) => {
- param_values.verify(&prepare_lp.data_types)?;
- let input_plan = prepare_lp.input;
- input_plan.replace_params_with_values(¶m_values)
+ let plan_with_values = self.replace_params_with_values(¶m_values)?;
+
+ // unwrap Prepare
+ Ok(if let LogicalPlan::Prepare(prepare_lp) = plan_with_values {
+ param_values.verify(&prepare_lp.data_types)?;
+ // try and take ownership of the input if is not shared, clone
otherwise
+ match Arc::try_unwrap(prepare_lp.input) {
+ Ok(input) => input,
+ Err(arc_input) => arc_input.as_ref().clone(),
}
- _ => self.replace_params_with_values(¶m_values),
- }
+ } else {
+ plan_with_values
+ })
}
/// Returns the maximum number of rows that this plan can output, if known.
@@ -1046,27 +1051,26 @@ impl LogicalPlan {
/// ...) replaced with corresponding values provided in
/// `params_values`
///
- /// See [`Self::with_param_values`] for examples and usage
+ /// See [`Self::with_param_values`] for examples and usage with an owned
+ /// `ParamValues`
pub fn replace_params_with_values(
- &self,
+ self,
param_values: &ParamValues,
) -> Result<LogicalPlan> {
- let new_exprs = self
- .expressions()
- .into_iter()
- .map(|e| {
- let e = e.infer_placeholder_types(self.schema())?;
- Self::replace_placeholders_with_values(e, param_values)
+ self.transform_up_with_subqueries(&|plan| {
+ let schema = plan.schema().clone();
+ plan.map_expressions(|e| {
+ e.infer_placeholder_types(&schema)?.transform_up(&|e| {
+ if let Expr::Placeholder(Placeholder { id, .. }) = e {
+ let value =
param_values.get_placeholders_with_values(&id)?;
+ Ok(Transformed::yes(Expr::Literal(value)))
+ } else {
+ Ok(Transformed::no(e))
+ }
+ })
})
- .collect::<Result<Vec<_>>>()?;
-
- let new_inputs_with_values = self
- .inputs()
- .into_iter()
- .map(|inp| inp.replace_params_with_values(param_values))
- .collect::<Result<Vec<_>>>()?;
-
- self.with_new_exprs(new_exprs, new_inputs_with_values)
+ })
+ .map(|res| res.data)
}
/// Walk the logical plan, find any `Placeholder` tokens, and return a map
of their IDs and DataTypes
@@ -1099,33 +1103,6 @@ impl LogicalPlan {
.map(|_| param_types)
}
- /// Return an Expr with all placeholders replaced with their
- /// corresponding values provided in the params_values
- fn replace_placeholders_with_values(
- expr: Expr,
- param_values: &ParamValues,
- ) -> Result<Expr> {
- expr.transform(&|expr| {
- match &expr {
- Expr::Placeholder(Placeholder { id, .. }) => {
- let value = param_values.get_placeholders_with_values(id)?;
- // Replace the placeholder with the value
- Ok(Transformed::yes(Expr::Literal(value)))
- }
- Expr::ScalarSubquery(qry) => {
- let subquery =
-
Arc::new(qry.subquery.replace_params_with_values(param_values)?);
- Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
- subquery,
- outer_ref_columns: qry.outer_ref_columns.clone(),
- })))
- }
- _ => Ok(Transformed::no(expr)),
- }
- })
- .data()
- }
-
// ------------
// Various implementations for printing out LogicalPlans
// ------------