This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f9efba0e2c Add ExecutionPlan::reset_state (#17028)
f9efba0e2c is described below

commit f9efba0e2c4c9c272062b5d48f6a44830b8f08ab
Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com>
AuthorDate: Thu Aug 7 16:26:34 2025 -0500

    Add ExecutionPlan::reset_state (#17028)
    
    * Add ExecutionPlan::reset_state
    
    Co-authored-by: Robert Ream <rob...@stably.io>
    
    * Update datafusion/sqllogictest/test_files/cte.slt
    
    * Add reference
    
    * fmt
    
    * add to upgrade guide
    
    * add explain plan, implement in more plans
    
    * fmt
    
    * only explain
    
    ---------
    
    Co-authored-by: Robert Ream <rob...@stably.io>
---
 .../src/expressions/dynamic_filters.rs             |  8 +++
 datafusion/physical-plan/src/execution_plan.rs     | 25 +++++++
 datafusion/physical-plan/src/joins/cross_join.rs   | 12 ++++
 datafusion/physical-plan/src/joins/hash_join.rs    | 20 ++++++
 datafusion/physical-plan/src/recursive_query.rs    |  5 +-
 datafusion/physical-plan/src/sorts/sort.rs         | 80 ++++++++++++++++------
 datafusion/sqllogictest/test_files/cte.slt         | 55 +++++++++++++++
 docs/source/library-user-guide/upgrading.md        | 10 +++
 8 files changed, 190 insertions(+), 25 deletions(-)

diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs 
b/datafusion/physical-expr/src/expressions/dynamic_filters.rs
index a9a4e23233..d4b3180a6f 100644
--- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs
+++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs
@@ -32,6 +32,10 @@ use datafusion_expr::ColumnarValue;
 use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
 
 /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference 
to it.
+///
+/// Any `ExecutionPlan` that uses this expression and holds a reference to it 
internally should probably also
+/// implement `ExecutionPlan::reset_state` to remain compatible with recursive 
queries and other situations where
+/// the same `ExecutionPlan` is reused with different data.
 #[derive(Debug)]
 pub struct DynamicFilterPhysicalExpr {
     /// The original children of this PhysicalExpr, if any.
@@ -121,6 +125,10 @@ impl DynamicFilterPhysicalExpr {
     /// do not change* since those will be used to determine what columns need 
to read or projected
     /// when evaluating the expression.
     ///
+    /// Any `ExecutionPlan` that uses this expression and holds a reference to 
it internally should probably also
+    /// implement `ExecutionPlan::reset_state` to remain compatible with 
recursive queries and other situations where
+    /// the same `ExecutionPlan` is reused with different data.
+    ///
     /// [`collect_columns`]: crate::utils::collect_columns
     pub fn new(
         children: Vec<Arc<dyn PhysicalExpr>>,
diff --git a/datafusion/physical-plan/src/execution_plan.rs 
b/datafusion/physical-plan/src/execution_plan.rs
index 730d496201..d4e0fe82bd 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -195,6 +195,31 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>>;
 
+    /// Reset any internal state within this [`ExecutionPlan`].
+    ///
+    /// This method is called when an [`ExecutionPlan`] needs to be 
re-executed,
+    /// such as in recursive queries. Unlike 
[`ExecutionPlan::with_new_children`], this method
+    /// ensures that any stateful components (e.g., 
[`DynamicFilterPhysicalExpr`])
+    /// are reset to their initial state.
+    ///
+    /// The default implementation simply calls 
[`ExecutionPlan::with_new_children`] with the existing children,
+    /// effectively creating a new instance of the [`ExecutionPlan`] with the 
same children but without
+    /// necessarily resetting any internal state. Implementations that require 
resetting of some
+    /// internal state should override this method to provide the necessary 
logic.
+    ///
+    /// This method should *not* reset state recursively for children, as it 
is expected that
+    /// it will be called from within a walk of the execution plan tree so 
that it will be called on each child later
+    /// or was already called on each child.
+    ///
+    /// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this 
method does not accept new children as an argument,
+    /// thus it is expected that any cached plan properties will remain valid 
after the reset.
+    ///
+    /// [`DynamicFilterPhysicalExpr`]: 
datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
+    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
+        let children = self.children().into_iter().cloned().collect();
+        self.with_new_children(children)
+    }
+
     /// If supported, attempt to increase the partitioning of this 
`ExecutionPlan` to
     /// produce `target_partitions` partitions.
     ///
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs 
b/datafusion/physical-plan/src/joins/cross_join.rs
index a41e668ab4..b8ea6330a1 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -270,6 +270,18 @@ impl ExecutionPlan for CrossJoinExec {
         )))
     }
 
+    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
+        let new_exec = CrossJoinExec {
+            left: Arc::clone(&self.left),
+            right: Arc::clone(&self.right),
+            schema: Arc::clone(&self.schema),
+            left_fut: Default::default(), // reset the build side!
+            metrics: ExecutionPlanMetricsSet::default(),
+            cache: self.cache.clone(),
+        };
+        Ok(Arc::new(new_exec))
+    }
+
     fn required_input_distribution(&self) -> Vec<Distribution> {
         vec![
             Distribution::SinglePartition,
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index 0a26039462..6058b7974e 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -774,6 +774,26 @@ impl ExecutionPlan for HashJoinExec {
         )?))
     }
 
+    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
+        // Reset the left_fut to allow re-execution
+        Ok(Arc::new(HashJoinExec {
+            left: Arc::clone(&self.left),
+            right: Arc::clone(&self.right),
+            on: self.on.clone(),
+            filter: self.filter.clone(),
+            join_type: self.join_type,
+            join_schema: Arc::clone(&self.join_schema),
+            left_fut: OnceAsync::default(),
+            random_state: self.random_state.clone(),
+            mode: self.mode,
+            metrics: ExecutionPlanMetricsSet::new(),
+            projection: self.projection.clone(),
+            column_indices: self.column_indices.clone(),
+            null_equality: self.null_equality,
+            cache: self.cache.clone(),
+        }))
+    }
+
     fn execute(
         &self,
         partition: usize,
diff --git a/datafusion/physical-plan/src/recursive_query.rs 
b/datafusion/physical-plan/src/recursive_query.rs
index 99b460dfcf..700a9076fe 100644
--- a/datafusion/physical-plan/src/recursive_query.rs
+++ b/datafusion/physical-plan/src/recursive_query.rs
@@ -372,7 +372,7 @@ fn assign_work_table(
 }
 
 /// Some plans will change their internal states after execution, making them 
unable to be executed again.
-/// This function uses `ExecutionPlan::with_new_children` to fork a new plan 
with initial states.
+/// This function uses [`ExecutionPlan::reset_state`] to reset any internal 
state within the plan.
 ///
 /// An example is `CrossJoinExec`, which loads the left table into memory and 
stores it in the plan.
 /// However, if the data of the left table is derived from the work table, it 
will become outdated
@@ -383,8 +383,7 @@ fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> 
Result<Arc<dyn ExecutionPl
         if plan.as_any().is::<WorkTableExec>() {
             Ok(Transformed::no(plan))
         } else {
-            let new_plan = Arc::clone(&plan)
-                
.with_new_children(plan.children().into_iter().cloned().collect())?;
+            let new_plan = Arc::clone(&plan).reset_state()?;
             Ok(Transformed::yes(new_plan))
         }
     })
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index 0b7d3977d2..dc2a5640f4 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -905,6 +905,29 @@ impl SortExec {
         self
     }
 
+    /// Add or reset `self.filter` to a new `DynamicFilterPhysicalExpr`.
+    fn create_filter(&self) -> Arc<DynamicFilterPhysicalExpr> {
+        let children = self
+            .expr
+            .iter()
+            .map(|sort_expr| Arc::clone(&sort_expr.expr))
+            .collect::<Vec<_>>();
+        Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
+    }
+
+    fn cloned(&self) -> Self {
+        SortExec {
+            input: Arc::clone(&self.input),
+            expr: self.expr.clone(),
+            metrics_set: self.metrics_set.clone(),
+            preserve_partitioning: self.preserve_partitioning,
+            common_sort_prefix: self.common_sort_prefix.clone(),
+            fetch: self.fetch,
+            cache: self.cache.clone(),
+            filter: self.filter.clone(),
+        }
+    }
+
     /// Modify how many rows to include in the result
     ///
     /// If None, then all rows will be returned, in sorted order.
@@ -926,25 +949,13 @@ impl SortExec {
         }
         let filter = fetch.is_some().then(|| {
             // If we already have a filter, keep it. Otherwise, create a new 
one.
-            self.filter.clone().unwrap_or_else(|| {
-                let children = self
-                    .expr
-                    .iter()
-                    .map(|sort_expr| Arc::clone(&sort_expr.expr))
-                    .collect::<Vec<_>>();
-                Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
-            })
+            self.filter.clone().unwrap_or_else(|| self.create_filter())
         });
-        SortExec {
-            input: Arc::clone(&self.input),
-            expr: self.expr.clone(),
-            metrics_set: self.metrics_set.clone(),
-            preserve_partitioning: self.preserve_partitioning,
-            common_sort_prefix: self.common_sort_prefix.clone(),
-            fetch,
-            cache,
-            filter,
-        }
+        let mut new_sort = self.cloned();
+        new_sort.fetch = fetch;
+        new_sort.cache = cache;
+        new_sort.filter = filter;
+        new_sort
     }
 
     /// Input schema
@@ -1116,10 +1127,35 @@ impl ExecutionPlan for SortExec {
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let mut new_sort = SortExec::new(self.expr.clone(), 
Arc::clone(&children[0]))
-            .with_fetch(self.fetch)
-            .with_preserve_partitioning(self.preserve_partitioning);
-        new_sort.filter = self.filter.clone();
+        let mut new_sort = self.cloned();
+        assert!(
+            children.len() == 1,
+            "SortExec should have exactly one child"
+        );
+        new_sort.input = Arc::clone(&children[0]);
+        // Recompute the properties based on the new input since they may have 
changed
+        let (cache, sort_prefix) = Self::compute_properties(
+            &new_sort.input,
+            new_sort.expr.clone(),
+            new_sort.preserve_partitioning,
+        )?;
+        new_sort.cache = cache;
+        new_sort.common_sort_prefix = sort_prefix;
+
+        Ok(Arc::new(new_sort))
+    }
+
+    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
+        let children = self.children().into_iter().cloned().collect();
+        let new_sort = self.with_new_children(children)?;
+        let mut new_sort = new_sort
+            .as_any()
+            .downcast_ref::<SortExec>()
+            .expect("cloned 1 lines above this line, we know the type")
+            .clone();
+        // Our dynamic filter and execution metrics are the state we need to 
reset.
+        new_sort.filter = Some(new_sort.create_filter());
+        new_sort.metrics_set = ExecutionPlanMetricsSet::new();
 
         Ok(Arc::new(new_sort))
     }
diff --git a/datafusion/sqllogictest/test_files/cte.slt 
b/datafusion/sqllogictest/test_files/cte.slt
index 32320a06f4..5f8fd1a0b5 100644
--- a/datafusion/sqllogictest/test_files/cte.slt
+++ b/datafusion/sqllogictest/test_files/cte.slt
@@ -996,6 +996,61 @@ physical_plan
 08)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
 09)------------WorkTableExec: name=numbers
 
+# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across 
multiple executions
+query II
+with recursive r as (
+  select 0 as k, 0 as v
+  union all
+  (
+    select *
+    from r
+    order by v
+    limit 1
+  )
+)
+select *
+from r
+limit 5;
+----
+0 0
+0 0
+0 0
+0 0
+0 0
+
+query TT
+explain
+with recursive r as (
+  select 0 as k, 0 as v
+  union all
+  (
+    select *
+    from r
+    order by v
+    limit 1
+  )
+)
+select *
+from r
+limit 5;
+----
+logical_plan
+01)SubqueryAlias: r
+02)--Limit: skip=0, fetch=5
+03)----RecursiveQuery: is_distinct=false
+04)------Projection: Int64(0) AS k, Int64(0) AS v
+05)--------EmptyRelation
+06)------Sort: r.v ASC NULLS LAST, fetch=1
+07)--------Projection: r.k, r.v
+08)----------TableScan: r
+physical_plan
+01)GlobalLimitExec: skip=0, fetch=5
+02)--RecursiveQueryExec: name=r, is_distinct=false
+03)----ProjectionExec: expr=[0 as k, 0 as v]
+04)------PlaceholderRowExec
+05)----SortExec: TopK(fetch=1), expr=[v@1 ASC NULLS LAST], 
preserve_partitioning=[false]
+06)------WorkTableExec: name=r
+
 statement count 0
 set datafusion.execution.enable_recursive_ctes = false;
 
diff --git a/docs/source/library-user-guide/upgrading.md 
b/docs/source/library-user-guide/upgrading.md
index e18b6682b8..58167238fe 100644
--- a/docs/source/library-user-guide/upgrading.md
+++ b/docs/source/library-user-guide/upgrading.md
@@ -142,6 +142,16 @@ This version of DataFusion upgrades the underlying Apache 
Arrow implementation
 to version `56.0.0`. See the [release 
notes](https://github.com/apache/arrow-rs/releases/tag/56.0.0)
 for more details.
 
+### Added `ExecutionPlan::reset_state`
+
+In order to fix a bug in DataFusion `49.0.0` where dynamic filters (currently 
only generated in the precense of a query such as `ORDER BY ... LIMIT ...`)
+produced incorrect results in recursive queries, a new method `reset_state` 
has been added to the `ExecutionPlan` trait.
+
+Any `ExecutionPlan` that needs to maintain internal state or references to 
other nodes in the execution plan tree should implement this method to reset 
that state.
+See [#17028] for more details and an example implementation for `SortExec`.
+
+[#17028]: https://github.com/apache/datafusion/pull/17028
+
 ## DataFusion `49.0.0`
 
 ### `MSRV` updated to 1.85.1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to