This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b72d25cc3a fix: recursive cte hangs on joins (#9687)
b72d25cc3a is described below

commit b72d25cc3a3a4257de1fc88e8df56b4c874d60ce
Author: Jonah Gao <[email protected]>
AuthorDate: Thu Mar 21 07:56:54 2024 +0800

    fix: recursive cte hangs on joins (#9687)
    
    * fix: recursive cte hangs on joins
    
    * Use ExecutionPlan::with_new_children
    
    * Naming
---
 datafusion/physical-plan/src/recursive_query.rs | 26 +++++++--
 datafusion/sqllogictest/test_files/cte.slt      | 73 +++++++++++++++++++++++--
 2 files changed, 90 insertions(+), 9 deletions(-)

diff --git a/datafusion/physical-plan/src/recursive_query.rs 
b/datafusion/physical-plan/src/recursive_query.rs
index 68abc9653a..140820ff78 100644
--- a/datafusion/physical-plan/src/recursive_query.rs
+++ b/datafusion/physical-plan/src/recursive_query.rs
@@ -309,10 +309,9 @@ impl RecursiveQueryStream {
         // Downstream plans should not expect any partitioning.
         let partition = 0;
 
-        self.recursive_stream = Some(
-            self.recursive_term
-                .execute(partition, self.task_context.clone())?,
-        );
+        let recursive_plan = reset_plan_states(self.recursive_term.clone())?;
+        self.recursive_stream =
+            Some(recursive_plan.execute(partition, 
self.task_context.clone())?);
         self.poll_next(cx)
     }
 }
@@ -343,6 +342,25 @@ fn assign_work_table(
     .data()
 }
 
+/// Some plans will change their internal states after execution, making them 
unable to be executed again.
+/// This function uses `ExecutionPlan::with_new_children` to fork a new plan 
with initial states.
+///
+/// An example is `CrossJoinExec`, which loads the left table into memory and 
stores it in the plan.
+/// However, if the data of the left table is derived from the work table, it 
will become outdated
+/// as the work table changes. When the next iteration executes this plan 
again, we must clear the left table.
+fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn 
ExecutionPlan>> {
+    plan.transform_up(&|plan| {
+        // WorkTableExec's states have already been updated correctly.
+        if plan.as_any().is::<WorkTableExec>() {
+            Ok(Transformed::no(plan))
+        } else {
+            let new_plan = plan.clone().with_new_children(plan.children())?;
+            Ok(Transformed::yes(new_plan))
+        }
+    })
+    .data()
+}
+
 impl Stream for RecursiveQueryStream {
     type Item = Result<RecordBatch>;
 
diff --git a/datafusion/sqllogictest/test_files/cte.slt 
b/datafusion/sqllogictest/test_files/cte.slt
index 6b9db55893..50c88e4195 100644
--- a/datafusion/sqllogictest/test_files/cte.slt
+++ b/datafusion/sqllogictest/test_files/cte.slt
@@ -40,11 +40,6 @@ ProjectionExec: expr=[1 as a, 2 as b, 3 as c]
 --PlaceholderRowExec
 
 
-
-# enable recursive CTEs
-statement ok
-set datafusion.execution.enable_recursive_ctes = true;
-
 # trivial recursive CTE works
 query I rowsort
 WITH RECURSIVE nodes AS ( 
@@ -651,3 +646,71 @@ WITH RECURSIVE my_cte AS (
     WHERE my_cte.a<5
 )
 SELECT a FROM my_cte;
+
+
+# Test issue: https://github.com/apache/arrow-datafusion/issues/9680
+query I
+WITH RECURSIVE recursive_cte AS (
+  SELECT 1 as val
+  UNION ALL
+    (
+      WITH sub_cte AS (
+      SELECT 2 as val
+    )
+    SELECT
+      2 as val
+    FROM recursive_cte
+      CROSS JOIN sub_cte
+    WHERE recursive_cte.val < 2
+  )
+)
+SELECT * FROM recursive_cte;
+----
+1
+2
+
+# Test issue: https://github.com/apache/arrow-datafusion/issues/9680
+# 'recursive_cte' should be on the left of the cross join, as this is the test 
purpose of the above query.
+query TT
+explain WITH RECURSIVE recursive_cte AS (
+  SELECT 1 as val
+  UNION ALL
+    (
+      WITH sub_cte AS (
+      SELECT 2 as val
+    )
+    SELECT
+      2 as val
+    FROM recursive_cte
+      CROSS JOIN sub_cte
+    WHERE recursive_cte.val < 2
+  )
+)
+SELECT * FROM recursive_cte;
+----
+logical_plan
+Projection: recursive_cte.val
+--SubqueryAlias: recursive_cte
+----RecursiveQuery: is_distinct=false
+------Projection: Int64(1) AS val
+--------EmptyRelation
+------Projection: Int64(2) AS val
+--------CrossJoin:
+----------Filter: recursive_cte.val < Int64(2)
+------------TableScan: recursive_cte
+----------SubqueryAlias: sub_cte
+------------Projection: Int64(2) AS val
+--------------EmptyRelation
+physical_plan
+RecursiveQueryExec: name=recursive_cte, is_distinct=false
+--ProjectionExec: expr=[1 as val]
+----PlaceholderRowExec
+--ProjectionExec: expr=[2 as val]
+----CrossJoinExec
+------CoalescePartitionsExec
+--------CoalesceBatchesExec: target_batch_size=8182
+----------FilterExec: val@0 < 2
+------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+--------------WorkTableExec: name=recursive_cte
+------ProjectionExec: expr=[2 as val]
+--------PlaceholderRowExec

Reply via email to