askalt commented on code in PR #20009: URL: https://github.com/apache/datafusion/pull/20009#discussion_r2730729411
########## datafusion/physical-plan/src/resolve_placeholders.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the placeholder resolver plan. This plan stores linear indices of the execution plans +//! that contain expressions with placeholders and indices of expressions within plans. +//! Placeholders are resolved during execution. ParamValues are taken from TaskContext. + +use std::{any::Any, sync::Arc}; Review Comment: Let's move `ResolvePlaceholdersExec` to the separate PR. Ask for it, because: plan contains placeholders => it probably will be re-used => before re-use we should re-set nodes state To re-use plan we should probably not only resolve placeholders but also re-set a node state. And imagine that there are not only placeholders required to be resolved but some user defined expressions too: it would be good to traverse the plan (and expressions) once to apply all this "service" stuff. So it looks that to be more useful in upstream -- we should make `ResolvePlaceholdersExec` more customizable. I see the pattern it follows: ========== planning stage ========== 1. Traverse the plan. 2. Check some predicate `p` about each physical expression. 3. Remember locations where the predicate is true. ========== execution stage ========== 4. Map expressions with mapping `m` from these locations when `execute(...)` is called. Can we add an ability to customize `p` and `m`? ########## datafusion/physical-plan/src/projection.rs: ########## @@ -485,6 +485,45 @@ impl ExecutionPlan for ProjectionExec { .ok() }) } + + fn physical_expressions( + &self, + ) -> Option<Box<dyn Iterator<Item = Arc<dyn PhysicalExpr>> + '_>> { + Some(Box::new(self.projector.projection().expr_iter())) + } + + fn with_physical_expressions( + &self, + exprs: Vec<Arc<dyn PhysicalExpr>>, + ) -> Result<Option<Arc<dyn ExecutionPlan>>> { + let expected_count = self.expr().len(); + let exprs_count = exprs.len(); + if exprs_count != expected_count { + return internal_err!( + "Expect {} expressions, but got {}", + expected_count, + exprs_count + ); + } + + let projection_exprs = self + .expr() + .iter() + .zip(exprs) + .map(|(p, expr)| ProjectionExpr::new(expr, p.alias.clone())) + .collect::<Vec<_>>(); + + let projection = ProjectionExprs::from(projection_exprs); + let input_schema = self.input.schema(); + let projector = projection.make_projector(&input_schema)?; Review Comment: `make_projector` inside clones a projection (allocates all aliases), so it seems our explicit re-allocations in loop are just allocated and dropped here. Can we avoid it? e.g. create a similar fn but consume `self`. ########## datafusion/execution/src/task.rs: ########## @@ -48,6 +50,8 @@ pub struct TaskContext { window_functions: HashMap<String, Arc<WindowUDF>>, /// Runtime environment associated with this task context runtime: Arc<RuntimeEnv>, + /// Param values for physical placeholders Review Comment: As `TaskContext` does not depend on `physical-plan` crate, it would be good to avoid such dependencies in the comment too: ```suggestion /// External query parameters ``` ########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -722,6 +722,48 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Option<Arc<dyn ExecutionPlan>> { None } + + /// Returns an iterator over a subset of [`PhysicalExpr`]s that are used by this + /// [`ExecutionPlan`]. + /// + /// This method provides a way to inspect the physical expressions within a node without + /// knowing its specific type. These expressions could be predicates in a filter, projection + /// expressions, or join conditions. + /// + /// The default implementation returns `None`, indicating that the node either has no physical + /// expressions or does not support exposing them. + fn physical_expressions<'a>( + &'a self, + ) -> Option<Box<dyn Iterator<Item = Arc<dyn PhysicalExpr>> + 'a>> { + None + } + + /// Returns a new `ExecutionPlan` with its physical expressions replaced by the provided + /// `exprs`. + /// + /// This method is the counterpart to [`ExecutionPlan::physical_expressions`]. It allows + /// transforming the expressions within a node while preserving the node itself and its + /// children. + /// + /// # Constraints + /// + /// * The number of expressions in `exprs` must match the number of expressions returned by + /// [`ExecutionPlan::physical_expressions`]. + /// * The order of expressions in `exprs` must match the order they were yielded by the + /// iterator from [`ExecutionPlan::physical_expressions`]. + /// + /// # Returns + /// + /// * `Ok(Some(new_plan))` if the expressions were successfully replaced. + /// * `Ok(None)` if the node does not support replacing its expressions (the default). + /// * `Err` if the number of expressions is incorrect or if another error occurs during + /// replacement. + fn with_physical_expressions( + &self, + _exprs: Vec<Arc<dyn PhysicalExpr>>, Review Comment: Let's wrap args with structure to be able to extend it in the future without breaking backward compatibility. Currently, the method does not assume that expressions semantic can change, so plans does not recompute their properties (plan properties typically depend on expressions, e.g., `ProjectionExec`). But imagine that there will be some analysis that will change expression semantic. So, we can do something like: ```rust struct ReplacePhysicalExpr { exprs: Vec<Arc<dyn PhysicalExpr>>, /// recompute_properties: bool, } ``` We can for now exclude `recompute_properties` (and highlight in the comment that they should not be re-computed by default), but just leave the place for extension here. ########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -722,6 +722,48 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Option<Arc<dyn ExecutionPlan>> { None } + + /// Returns an iterator over a subset of [`PhysicalExpr`]s that are used by this + /// [`ExecutionPlan`]. + /// + /// This method provides a way to inspect the physical expressions within a node without + /// knowing its specific type. These expressions could be predicates in a filter, projection + /// expressions, or join conditions. Review Comment: ```suggestion /// expressions, or join conditions, etc. ``` ########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -1152,6 +1194,57 @@ pub fn check_default_invariants<P: ExecutionPlan + ?Sized>( Ok(()) } +/// Verifies that the [`ExecutionPlan::physical_expressions`] and +/// [`ExecutionPlan::with_physical_expressions`] implementations are consistent. +/// +/// It verifies: +/// 1. `with_physical_expressions` with the same expressions returns a plan with the same expressions. +/// 2. `with_physical_expressions` with a different number of expressions returns an error. +/// +/// Returns rewritten plan if it supported expression replacement. +pub fn check_physical_expressions( + plan: Arc<dyn ExecutionPlan>, +) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> { + let Some(exprs) = plan.physical_expressions() else { + if let Some(new_plan) = plan.with_physical_expressions(vec![])? { + assert_or_internal_err!(plan.physical_expressions().is_none()); + + let new_expr = Arc::new(expressions::Column::new("", 0)); + let result = plan.with_physical_expressions(vec![new_expr]); + assert_or_internal_err!(result.is_err()); + + return Ok(new_plan); + }; + + return Ok(plan); + }; + + let mut exprs = exprs.collect::<Vec<_>>(); + + // Should return plan with same expressions + let new_plan = plan.with_physical_expressions(exprs.clone())?.unwrap(); + let new_plan_exprs = new_plan.physical_expressions().unwrap(); + + assert_or_internal_err!(exprs.iter().cloned().eq(new_plan_exprs)); + + // Should return an error if number of expressions differs from physical_expressions() + let new_expr = Arc::new(expressions::Column::new("", 0)); + exprs.push(new_expr); + + let result = new_plan.with_physical_expressions(exprs.clone()); + assert_or_internal_err!(result.is_err()); + + if exprs.len() > 2 { + exprs.pop().unwrap(); + exprs.pop().unwrap(); + + let result = new_plan.with_physical_expressions(exprs); + assert_or_internal_err!(result.is_err()); Review Comment: Let's add some details to internal error description to make it more debug friendly. ########## datafusion/physical-plan/src/sorts/sort_preserving_merge.rs: ########## @@ -409,6 +410,46 @@ impl ExecutionPlan for SortPreservingMergeExec { .with_fetch(self.fetch()), ))) } + + fn physical_expressions<'a>( + &'a self, + ) -> Option<Box<dyn Iterator<Item = Arc<dyn PhysicalExpr>> + 'a>> { + Some(Box::new( + self.expr.iter().map(|sort| Arc::clone(&sort.expr)), + )) + } + + fn with_physical_expressions( + &self, + exprs: Vec<Arc<dyn PhysicalExpr>>, + ) -> Result<Option<Arc<dyn ExecutionPlan>>> { + let expected_count = self.expr.len(); + let exprs_count = exprs.len(); + if exprs_count != expected_count { Review Comment: Let's use `assert_or_internal_error` for such checks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
