This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2def10ff1a Stop copying plans in  `LogicalPlan::with_param_values` 
(#10016)
2def10ff1a is described below

commit 2def10ff1a1ad6b99a1ae40692115b28c3efdfc2
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Apr 12 07:21:02 2024 -0400

    Stop copying plans in  `LogicalPlan::with_param_values` (#10016)
---
 datafusion/expr/src/logical_plan/plan.rs | 79 +++++++++++---------------------
 1 file changed, 28 insertions(+), 51 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index cb8c97c71e..91c8670f38 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -913,14 +913,19 @@ impl LogicalPlan {
         param_values: impl Into<ParamValues>,
     ) -> Result<LogicalPlan> {
         let param_values = param_values.into();
-        match self {
-            LogicalPlan::Prepare(prepare_lp) => {
-                param_values.verify(&prepare_lp.data_types)?;
-                let input_plan = prepare_lp.input;
-                input_plan.replace_params_with_values(&param_values)
+        let plan_with_values = self.replace_params_with_values(&param_values)?;
+
+        // unwrap Prepare
+        Ok(if let LogicalPlan::Prepare(prepare_lp) = plan_with_values {
+            param_values.verify(&prepare_lp.data_types)?;
+            // try and take ownership of the input if is not shared, clone 
otherwise
+            match Arc::try_unwrap(prepare_lp.input) {
+                Ok(input) => input,
+                Err(arc_input) => arc_input.as_ref().clone(),
             }
-            _ => self.replace_params_with_values(&param_values),
-        }
+        } else {
+            plan_with_values
+        })
     }
 
     /// Returns the maximum number of rows that this plan can output, if known.
@@ -1046,27 +1051,26 @@ impl LogicalPlan {
     /// ...) replaced with corresponding values provided in
     /// `params_values`
     ///
-    /// See [`Self::with_param_values`] for examples and usage
+    /// See [`Self::with_param_values`] for examples and usage with an owned
+    /// `ParamValues`
     pub fn replace_params_with_values(
-        &self,
+        self,
         param_values: &ParamValues,
     ) -> Result<LogicalPlan> {
-        let new_exprs = self
-            .expressions()
-            .into_iter()
-            .map(|e| {
-                let e = e.infer_placeholder_types(self.schema())?;
-                Self::replace_placeholders_with_values(e, param_values)
+        self.transform_up_with_subqueries(&|plan| {
+            let schema = plan.schema().clone();
+            plan.map_expressions(|e| {
+                e.infer_placeholder_types(&schema)?.transform_up(&|e| {
+                    if let Expr::Placeholder(Placeholder { id, .. }) = e {
+                        let value = 
param_values.get_placeholders_with_values(&id)?;
+                        Ok(Transformed::yes(Expr::Literal(value)))
+                    } else {
+                        Ok(Transformed::no(e))
+                    }
+                })
             })
-            .collect::<Result<Vec<_>>>()?;
-
-        let new_inputs_with_values = self
-            .inputs()
-            .into_iter()
-            .map(|inp| inp.replace_params_with_values(param_values))
-            .collect::<Result<Vec<_>>>()?;
-
-        self.with_new_exprs(new_exprs, new_inputs_with_values)
+        })
+        .map(|res| res.data)
     }
 
     /// Walk the logical plan, find any `Placeholder` tokens, and return a map 
of their IDs and DataTypes
@@ -1099,33 +1103,6 @@ impl LogicalPlan {
         .map(|_| param_types)
     }
 
-    /// Return an Expr with all placeholders replaced with their
-    /// corresponding values provided in the params_values
-    fn replace_placeholders_with_values(
-        expr: Expr,
-        param_values: &ParamValues,
-    ) -> Result<Expr> {
-        expr.transform(&|expr| {
-            match &expr {
-                Expr::Placeholder(Placeholder { id, .. }) => {
-                    let value = param_values.get_placeholders_with_values(id)?;
-                    // Replace the placeholder with the value
-                    Ok(Transformed::yes(Expr::Literal(value)))
-                }
-                Expr::ScalarSubquery(qry) => {
-                    let subquery =
-                        
Arc::new(qry.subquery.replace_params_with_values(param_values)?);
-                    Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
-                        subquery,
-                        outer_ref_columns: qry.outer_ref_columns.clone(),
-                    })))
-                }
-                _ => Ok(Transformed::no(expr)),
-            }
-        })
-        .data()
-    }
-
     // ------------
     // Various implementations for printing out LogicalPlans
     // ------------

Reply via email to