alamb commented on code in PR #4561:
URL: https://github.com/apache/arrow-datafusion/pull/4561#discussion_r1044406639


##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -364,6 +368,42 @@ impl LogicalPlan {
     ) -> Result<LogicalPlan, DataFusionError> {
         from_plan(self, &self.expressions(), inputs)
     }
+
+    /// Convert a prepare logical plan into its inner logical plan with all 
params replaced with their corresponding values
+    pub fn execute(

Review Comment:
   While the `execute` name matches how this function will likely be used, I 
think naming it something more specific to what it is doing might be clearer.
   
   What about something like 
   
   ```suggestion
       pub fn with_param_values(self, param_values: Vec<ScalarValue>)
   ```
   
   That way to use this one could do
   
   ```
   let plan = logical_plan
     .with_param_values(param_value)
   ```
   
   



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -534,6 +574,655 @@ impl LogicalPlan {
             _ => {}
         }
     }
+
+    /// recursively to replace the params (e.g $1 $2, ...) wit corresponding 
values provided in the prams_values
+    pub fn replace_params_with_values(
+        &self,
+        param_values: &Vec<ScalarValue>,
+    ) -> Result<LogicalPlan, DataFusionError> {

Review Comment:
   I think you can use `datafusion_expr::utils::from_plan`  to avoid most of 
this code and make this PR quite a bit smaller
   
   
https://docs.rs/datafusion-expr/15.0.0/datafusion_expr/utils/fn.from_plan.html
   
   
   
   
   



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -534,6 +574,655 @@ impl LogicalPlan {
             _ => {}
         }
     }
+
+    /// recursively to replace the params (e.g $1 $2, ...) with corresponding 
values provided in the prams_values
+    pub fn replace_params_with_values(
+        &self,
+        param_values: &Vec<ScalarValue>,
+    ) -> Result<LogicalPlan, DataFusionError> {
+        match self {
+            LogicalPlan::Projection(Projection {
+                expr,
+                input,
+                schema,
+            }) => {
+                let expr = &mut expr
+                    .iter()
+                    .map(|e| {
+                        Self::replace_placeholders_with_values(e.clone(), 
param_values)
+                    })
+                    .collect::<Result<Vec<_>, _>>()?;
+
+                let input = input.replace_params_with_values(param_values)?;
+                Ok(LogicalPlan::Projection(Projection {
+                    expr: expr.clone(),
+                    input: Arc::new(input),
+                    schema: Arc::clone(schema),
+                }))
+            }
+            LogicalPlan::Filter(Filter { predicate, input }) => {
+                let predicate = Self::replace_placeholders_with_values(
+                    predicate.clone(),
+                    param_values,
+                )?;
+                let input = input.replace_params_with_values(param_values)?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate,
+                    input: Arc::new(input),
+                }))
+            }
+            LogicalPlan::Repartition(Repartition {
+                input,
+                partitioning_scheme,
+            }) => {
+                let input = input.replace_params_with_values(param_values)?;
+                // Even though the `partitioning` member of Repartition 
include expresions , they are internal ones and should not include params
+                // Hence no need to look for placeholders and replace them
+                Ok(LogicalPlan::Repartition(Repartition {
+                    input: Arc::new(input),
+                    partitioning_scheme: partitioning_scheme.clone(),
+                }))
+            }
+            LogicalPlan::Window(Window {
+                input,
+                window_expr,
+                schema,
+            }) => {
+                let input = input.replace_params_with_values(param_values)?;
+                let window_expr = &mut window_expr
+                    .iter()
+                    .map(|e| {
+                        Self::replace_placeholders_with_values(e.clone(), 
param_values)
+                    })
+                    .collect::<Result<Vec<_>, _>>()?;
+                Ok(LogicalPlan::Window(Window {
+                    input: Arc::new(input),
+                    window_expr: window_expr.clone(),
+                    schema: Arc::clone(schema),
+                }))
+            }
+            LogicalPlan::Aggregate(Aggregate {
+                input,
+                group_expr,
+                aggr_expr,
+                schema,
+            }) => {
+                let input = input.replace_params_with_values(param_values)?;
+                let group_expr = &mut group_expr
+                    .iter()
+                    .map(|e| {
+                        Self::replace_placeholders_with_values(e.clone(), 
param_values)
+                    })
+                    .collect::<Result<Vec<_>, _>>()?;
+                let aggr_expr = &mut aggr_expr
+                    .iter()
+                    .map(|e| {
+                        Self::replace_placeholders_with_values(e.clone(), 
param_values)
+                    })
+                    .collect::<Result<Vec<_>, _>>()?;
+                Ok(LogicalPlan::Aggregate(Aggregate {
+                    input: Arc::new(input),
+                    group_expr: group_expr.clone(),
+                    aggr_expr: aggr_expr.clone(),
+                    schema: Arc::clone(schema),
+                }))
+            }
+            LogicalPlan::Sort(Sort { input, expr, fetch }) => {
+                let input = input.replace_params_with_values(param_values)?;
+                let expr = &mut expr
+                    .iter()
+                    .map(|e| {
+                        Self::replace_placeholders_with_values(e.clone(), 
param_values)
+                    })
+                    .collect::<Result<Vec<_>, _>>()?;
+                Ok(LogicalPlan::Sort(Sort {
+                    input: Arc::new(input),
+                    expr: expr.clone(),
+                    fetch: *fetch,
+                }))
+            }
+            LogicalPlan::Join(Join {
+                left,
+                right,
+                filter,
+                on,
+                join_type,
+                join_constraint,
+                schema,
+                null_equals_null,
+            }) => {
+                let left = left.replace_params_with_values(param_values)?;
+                let fright = right.replace_params_with_values(param_values)?;
+                let filter = filter.clone().map(|f| {
+                    Self::replace_placeholders_with_values(f, param_values)
+                        .expect("Failed to replace params in join filter")
+                });
+                Ok(LogicalPlan::Join(Join {
+                    left: Arc::new(left),
+                    right: Arc::new(fright),
+                    filter,
+                    on: on.clone(),
+                    join_type: *join_type,
+                    join_constraint: *join_constraint,
+                    schema: Arc::clone(schema),
+                    null_equals_null: *null_equals_null,
+                }))
+            }
+            LogicalPlan::CrossJoin(CrossJoin {
+                left,
+                right,
+                schema,
+            }) => {
+                let left = left.replace_params_with_values(param_values)?;
+                let right = right.replace_params_with_values(param_values)?;
+                Ok(LogicalPlan::CrossJoin(CrossJoin {
+                    left: Arc::new(left),
+                    right: Arc::new(right),
+                    schema: Arc::clone(schema),
+                }))
+            }
+            LogicalPlan::Limit(Limit { input, skip, fetch }) => {
+                let input = input.replace_params_with_values(param_values)?;
+                Ok(LogicalPlan::Limit(Limit {
+                    input: Arc::new(input),
+                    skip: *skip,
+                    fetch: *fetch,
+                }))
+            }
+            LogicalPlan::Subquery(Subquery { subquery }) => {
+                let subquery = 
subquery.replace_params_with_values(param_values)?;
+                Ok(LogicalPlan::Subquery(Subquery {
+                    subquery: Arc::new(subquery),
+                }))
+            }
+            LogicalPlan::SubqueryAlias(SubqueryAlias {
+                input,
+                alias,
+                schema,
+            }) => {
+                let input = input.replace_params_with_values(param_values)?;
+                Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
+                    input: Arc::new(input),
+                    alias: alias.clone(),
+                    schema: Arc::clone(schema),
+                }))
+            }
+            LogicalPlan::Extension(Extension { node }) => {
+                // Currently only support params in standard SQL
+                // and extesion should not have any params
+                Ok(LogicalPlan::Extension(Extension { node: node.clone() }))
+            }
+            LogicalPlan::Union(Union { inputs, schema }) => {
+                let inputs = inputs
+                    .iter()
+                    .map(|input| 
input.replace_params_with_values(param_values))
+                    .collect::<Result<Vec<_>, _>>()?;
+                Ok(LogicalPlan::Union(Union {
+                    inputs: inputs.into_iter().map(Arc::new).collect(),
+                    schema: Arc::clone(schema),
+                }))
+            }
+            LogicalPlan::Distinct(Distinct { input }) => {
+                let input = input.replace_params_with_values(param_values)?;
+                Ok(LogicalPlan::Distinct(Distinct {
+                    input: Arc::new(input),
+                }))
+            }
+            LogicalPlan::Prepare(Prepare {
+                name,
+                data_types,
+                input,
+            }) => {
+                let input = input.replace_params_with_values(param_values)?;
+                Ok(LogicalPlan::Prepare(Prepare {
+                    name: name.clone(),
+                    data_types: data_types.clone(),
+                    input: Arc::new(input),
+                }))
+            }
+            // plans without inputs
+            LogicalPlan::TableScan(TableScan {
+                table_name,
+                source,
+                projection,
+                projected_schema,
+                filters,
+                fetch,
+            }) => {
+                let filters = filters
+                    .iter()
+                    .map(|f| {
+                        Self::replace_placeholders_with_values(f.clone(), 
param_values)
+                            .expect("Failed to replace params in table scan 
filter")
+                    })
+                    .collect();
+                Ok(LogicalPlan::TableScan(TableScan {
+                    table_name: table_name.clone(),
+                    source: Arc::clone(source),
+                    projection: projection.clone(),
+                    projected_schema: Arc::clone(projected_schema),
+                    filters,
+                    fetch: *fetch,
+                }))
+            }
+            LogicalPlan::Values(Values { values, schema }) => {
+                let values = values
+                    .iter()
+                    .map(|row| {
+                        row.iter()
+                            .map(|expr| {
+                                Self::replace_placeholders_with_values(
+                                    expr.clone(),
+                                    param_values,
+                                )
+                                .expect("Failed to replace params in values")
+                            })
+                            .collect()
+                    })
+                    .collect();
+                Ok(LogicalPlan::Values(Values {
+                    values,
+                    schema: Arc::clone(schema),
+                }))
+            }
+            LogicalPlan::EmptyRelation(EmptyRelation {
+                produce_one_row,
+                schema,
+            }) => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+                produce_one_row: *produce_one_row,
+                schema: Arc::clone(schema),
+            })),
+            LogicalPlan::Explain(_)
+            | LogicalPlan::Analyze(_)
+            | LogicalPlan::CreateMemoryTable(_)
+            | LogicalPlan::CreateView(_)
+            | LogicalPlan::CreateExternalTable(_)
+            | LogicalPlan::CreateCatalogSchema(_)
+            | LogicalPlan::CreateCatalog(_)
+            | LogicalPlan::DropTable(_)
+            | LogicalPlan::SetVariable(_)
+            | LogicalPlan::DropView(_) => Err::<LogicalPlan, DataFusionError>(
+                DataFusionError::NotImplemented(format!(
+                    "This logical plan should not contain 
parameters/placeholder: {}",
+                    self.display()
+                )),
+            ),
+        }
+    }
+
+    /// Recrusively to walk the expression and convert a placeholder into a 
literal value
+    fn replace_placeholders_with_values(
+        expr: Expr,
+        param_values: &Vec<ScalarValue>,
+    ) -> Result<Expr, DataFusionError> {

Review Comment:
   I think you could use an `ExprRewriter here to avoid having to write your 
own recursion for each type of expr, making this code shorter and easier to 
maintain
   
   
https://docs.rs/datafusion-expr/15.0.0/datafusion_expr/expr_rewriter/trait.ExprRewriter.html
   
   
   Something like this perhaps
   
   
https://github.com/apache/arrow-datafusion/blob/31bbe6c3eec245127b005d8afc8766e2d06d811b/datafusion/optimizer/src/push_down_filter.rs#L748-L767



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to