This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new f9efba0e2c Add ExecutionPlan::reset_state (#17028) f9efba0e2c is described below commit f9efba0e2c4c9c272062b5d48f6a44830b8f08ab Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com> AuthorDate: Thu Aug 7 16:26:34 2025 -0500 Add ExecutionPlan::reset_state (#17028) * Add ExecutionPlan::reset_state Co-authored-by: Robert Ream <rob...@stably.io> * Update datafusion/sqllogictest/test_files/cte.slt * Add reference * fmt * add to upgrade guide * add explain plan, implement in more plans * fmt * only explain --------- Co-authored-by: Robert Ream <rob...@stably.io> --- .../src/expressions/dynamic_filters.rs | 8 +++ datafusion/physical-plan/src/execution_plan.rs | 25 +++++++ datafusion/physical-plan/src/joins/cross_join.rs | 12 ++++ datafusion/physical-plan/src/joins/hash_join.rs | 20 ++++++ datafusion/physical-plan/src/recursive_query.rs | 5 +- datafusion/physical-plan/src/sorts/sort.rs | 80 ++++++++++++++++------ datafusion/sqllogictest/test_files/cte.slt | 55 +++++++++++++++ docs/source/library-user-guide/upgrading.md | 10 +++ 8 files changed, 190 insertions(+), 25 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index a9a4e23233..d4b3180a6f 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -32,6 +32,10 @@ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. +/// +/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also +/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where +/// the same `ExecutionPlan` is reused with different data. #[derive(Debug)] pub struct DynamicFilterPhysicalExpr { /// The original children of this PhysicalExpr, if any. @@ -121,6 +125,10 @@ impl DynamicFilterPhysicalExpr { /// do not change* since those will be used to determine what columns need to read or projected /// when evaluating the expression. /// + /// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also + /// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where + /// the same `ExecutionPlan` is reused with different data. + /// /// [`collect_columns`]: crate::utils::collect_columns pub fn new( children: Vec<Arc<dyn PhysicalExpr>>, diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 730d496201..d4e0fe82bd 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -195,6 +195,31 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>; + /// Reset any internal state within this [`ExecutionPlan`]. + /// + /// This method is called when an [`ExecutionPlan`] needs to be re-executed, + /// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method + /// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`]) + /// are reset to their initial state. + /// + /// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children, + /// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without + /// necessarily resetting any internal state. Implementations that require resetting of some + /// internal state should override this method to provide the necessary logic. + /// + /// This method should *not* reset state recursively for children, as it is expected that + /// it will be called from within a walk of the execution plan tree so that it will be called on each child later + /// or was already called on each child. + /// + /// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument, + /// thus it is expected that any cached plan properties will remain valid after the reset. + /// + /// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr + fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> { + let children = self.children().into_iter().cloned().collect(); + self.with_new_children(children) + } + /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to /// produce `target_partitions` partitions. /// diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index a41e668ab4..b8ea6330a1 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -270,6 +270,18 @@ impl ExecutionPlan for CrossJoinExec { ))) } + fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> { + let new_exec = CrossJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + schema: Arc::clone(&self.schema), + left_fut: Default::default(), // reset the build side! + metrics: ExecutionPlanMetricsSet::default(), + cache: self.cache.clone(), + }; + Ok(Arc::new(new_exec)) + } + fn required_input_distribution(&self) -> Vec<Distribution> { vec![ Distribution::SinglePartition, diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 0a26039462..6058b7974e 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -774,6 +774,26 @@ impl ExecutionPlan for HashJoinExec { )?)) } + fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> { + // Reset the left_fut to allow re-execution + Ok(Arc::new(HashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + left_fut: OnceAsync::default(), + random_state: self.random_state.clone(), + mode: self.mode, + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: self.cache.clone(), + })) + } + fn execute( &self, partition: usize, diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 99b460dfcf..700a9076fe 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -372,7 +372,7 @@ fn assign_work_table( } /// Some plans will change their internal states after execution, making them unable to be executed again. -/// This function uses `ExecutionPlan::with_new_children` to fork a new plan with initial states. +/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan. /// /// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan. /// However, if the data of the left table is derived from the work table, it will become outdated @@ -383,8 +383,7 @@ fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPl if plan.as_any().is::<WorkTableExec>() { Ok(Transformed::no(plan)) } else { - let new_plan = Arc::clone(&plan) - .with_new_children(plan.children().into_iter().cloned().collect())?; + let new_plan = Arc::clone(&plan).reset_state()?; Ok(Transformed::yes(new_plan)) } }) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 0b7d3977d2..dc2a5640f4 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -905,6 +905,29 @@ impl SortExec { self } + /// Add or reset `self.filter` to a new `DynamicFilterPhysicalExpr`. + fn create_filter(&self) -> Arc<DynamicFilterPhysicalExpr> { + let children = self + .expr + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::<Vec<_>>(); + Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) + } + + fn cloned(&self) -> Self { + SortExec { + input: Arc::clone(&self.input), + expr: self.expr.clone(), + metrics_set: self.metrics_set.clone(), + preserve_partitioning: self.preserve_partitioning, + common_sort_prefix: self.common_sort_prefix.clone(), + fetch: self.fetch, + cache: self.cache.clone(), + filter: self.filter.clone(), + } + } + /// Modify how many rows to include in the result /// /// If None, then all rows will be returned, in sorted order. @@ -926,25 +949,13 @@ impl SortExec { } let filter = fetch.is_some().then(|| { // If we already have a filter, keep it. Otherwise, create a new one. - self.filter.clone().unwrap_or_else(|| { - let children = self - .expr - .iter() - .map(|sort_expr| Arc::clone(&sort_expr.expr)) - .collect::<Vec<_>>(); - Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) - }) + self.filter.clone().unwrap_or_else(|| self.create_filter()) }); - SortExec { - input: Arc::clone(&self.input), - expr: self.expr.clone(), - metrics_set: self.metrics_set.clone(), - preserve_partitioning: self.preserve_partitioning, - common_sort_prefix: self.common_sort_prefix.clone(), - fetch, - cache, - filter, - } + let mut new_sort = self.cloned(); + new_sort.fetch = fetch; + new_sort.cache = cache; + new_sort.filter = filter; + new_sort } /// Input schema @@ -1116,10 +1127,35 @@ impl ExecutionPlan for SortExec { self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>> { - let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) - .with_fetch(self.fetch) - .with_preserve_partitioning(self.preserve_partitioning); - new_sort.filter = self.filter.clone(); + let mut new_sort = self.cloned(); + assert!( + children.len() == 1, + "SortExec should have exactly one child" + ); + new_sort.input = Arc::clone(&children[0]); + // Recompute the properties based on the new input since they may have changed + let (cache, sort_prefix) = Self::compute_properties( + &new_sort.input, + new_sort.expr.clone(), + new_sort.preserve_partitioning, + )?; + new_sort.cache = cache; + new_sort.common_sort_prefix = sort_prefix; + + Ok(Arc::new(new_sort)) + } + + fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> { + let children = self.children().into_iter().cloned().collect(); + let new_sort = self.with_new_children(children)?; + let mut new_sort = new_sort + .as_any() + .downcast_ref::<SortExec>() + .expect("cloned 1 lines above this line, we know the type") + .clone(); + // Our dynamic filter and execution metrics are the state we need to reset. + new_sort.filter = Some(new_sort.create_filter()); + new_sort.metrics_set = ExecutionPlanMetricsSet::new(); Ok(Arc::new(new_sort)) } diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index 32320a06f4..5f8fd1a0b5 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -996,6 +996,61 @@ physical_plan 08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 09)------------WorkTableExec: name=numbers +# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions +query II +with recursive r as ( + select 0 as k, 0 as v + union all + ( + select * + from r + order by v + limit 1 + ) +) +select * +from r +limit 5; +---- +0 0 +0 0 +0 0 +0 0 +0 0 + +query TT +explain +with recursive r as ( + select 0 as k, 0 as v + union all + ( + select * + from r + order by v + limit 1 + ) +) +select * +from r +limit 5; +---- +logical_plan +01)SubqueryAlias: r +02)--Limit: skip=0, fetch=5 +03)----RecursiveQuery: is_distinct=false +04)------Projection: Int64(0) AS k, Int64(0) AS v +05)--------EmptyRelation +06)------Sort: r.v ASC NULLS LAST, fetch=1 +07)--------Projection: r.k, r.v +08)----------TableScan: r +physical_plan +01)GlobalLimitExec: skip=0, fetch=5 +02)--RecursiveQueryExec: name=r, is_distinct=false +03)----ProjectionExec: expr=[0 as k, 0 as v] +04)------PlaceholderRowExec +05)----SortExec: TopK(fetch=1), expr=[v@1 ASC NULLS LAST], preserve_partitioning=[false] +06)------WorkTableExec: name=r + statement count 0 set datafusion.execution.enable_recursive_ctes = false; diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index e18b6682b8..58167238fe 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -142,6 +142,16 @@ This version of DataFusion upgrades the underlying Apache Arrow implementation to version `56.0.0`. See the [release notes](https://github.com/apache/arrow-rs/releases/tag/56.0.0) for more details. +### Added `ExecutionPlan::reset_state` + +In order to fix a bug in DataFusion `49.0.0` where dynamic filters (currently only generated in the precense of a query such as `ORDER BY ... LIMIT ...`) +produced incorrect results in recursive queries, a new method `reset_state` has been added to the `ExecutionPlan` trait. + +Any `ExecutionPlan` that needs to maintain internal state or references to other nodes in the execution plan tree should implement this method to reset that state. +See [#17028] for more details and an example implementation for `SortExec`. + +[#17028]: https://github.com/apache/datafusion/pull/17028 + ## DataFusion `49.0.0` ### `MSRV` updated to 1.85.1 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org