This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b72d25cc3a fix: recursive cte hangs on joins (#9687)
b72d25cc3a is described below
commit b72d25cc3a3a4257de1fc88e8df56b4c874d60ce
Author: Jonah Gao <[email protected]>
AuthorDate: Thu Mar 21 07:56:54 2024 +0800
fix: recursive cte hangs on joins (#9687)
* fix: recursive cte hangs on joins
* Use ExecutionPlan::with_new_children
* Naming
---
datafusion/physical-plan/src/recursive_query.rs | 26 +++++++--
datafusion/sqllogictest/test_files/cte.slt | 73 +++++++++++++++++++++++--
2 files changed, 90 insertions(+), 9 deletions(-)
diff --git a/datafusion/physical-plan/src/recursive_query.rs
b/datafusion/physical-plan/src/recursive_query.rs
index 68abc9653a..140820ff78 100644
--- a/datafusion/physical-plan/src/recursive_query.rs
+++ b/datafusion/physical-plan/src/recursive_query.rs
@@ -309,10 +309,9 @@ impl RecursiveQueryStream {
// Downstream plans should not expect any partitioning.
let partition = 0;
- self.recursive_stream = Some(
- self.recursive_term
- .execute(partition, self.task_context.clone())?,
- );
+ let recursive_plan = reset_plan_states(self.recursive_term.clone())?;
+ self.recursive_stream =
+ Some(recursive_plan.execute(partition,
self.task_context.clone())?);
self.poll_next(cx)
}
}
@@ -343,6 +342,25 @@ fn assign_work_table(
.data()
}
+/// Some plans will change their internal states after execution, making them
unable to be executed again.
+/// This function uses `ExecutionPlan::with_new_children` to fork a new plan
with initial states.
+///
+/// An example is `CrossJoinExec`, which loads the left table into memory and
stores it in the plan.
+/// However, if the data of the left table is derived from the work table, it
will become outdated
+/// as the work table changes. When the next iteration executes this plan
again, we must clear the left table.
+fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn
ExecutionPlan>> {
+ plan.transform_up(&|plan| {
+ // WorkTableExec's states have already been updated correctly.
+ if plan.as_any().is::<WorkTableExec>() {
+ Ok(Transformed::no(plan))
+ } else {
+ let new_plan = plan.clone().with_new_children(plan.children())?;
+ Ok(Transformed::yes(new_plan))
+ }
+ })
+ .data()
+}
+
impl Stream for RecursiveQueryStream {
type Item = Result<RecordBatch>;
diff --git a/datafusion/sqllogictest/test_files/cte.slt
b/datafusion/sqllogictest/test_files/cte.slt
index 6b9db55893..50c88e4195 100644
--- a/datafusion/sqllogictest/test_files/cte.slt
+++ b/datafusion/sqllogictest/test_files/cte.slt
@@ -40,11 +40,6 @@ ProjectionExec: expr=[1 as a, 2 as b, 3 as c]
--PlaceholderRowExec
-
-# enable recursive CTEs
-statement ok
-set datafusion.execution.enable_recursive_ctes = true;
-
# trivial recursive CTE works
query I rowsort
WITH RECURSIVE nodes AS (
@@ -651,3 +646,71 @@ WITH RECURSIVE my_cte AS (
WHERE my_cte.a<5
)
SELECT a FROM my_cte;
+
+
+# Test issue: https://github.com/apache/arrow-datafusion/issues/9680
+query I
+WITH RECURSIVE recursive_cte AS (
+ SELECT 1 as val
+ UNION ALL
+ (
+ WITH sub_cte AS (
+ SELECT 2 as val
+ )
+ SELECT
+ 2 as val
+ FROM recursive_cte
+ CROSS JOIN sub_cte
+ WHERE recursive_cte.val < 2
+ )
+)
+SELECT * FROM recursive_cte;
+----
+1
+2
+
+# Test issue: https://github.com/apache/arrow-datafusion/issues/9680
+# 'recursive_cte' should be on the left of the cross join, as this is the test
purpose of the above query.
+query TT
+explain WITH RECURSIVE recursive_cte AS (
+ SELECT 1 as val
+ UNION ALL
+ (
+ WITH sub_cte AS (
+ SELECT 2 as val
+ )
+ SELECT
+ 2 as val
+ FROM recursive_cte
+ CROSS JOIN sub_cte
+ WHERE recursive_cte.val < 2
+ )
+)
+SELECT * FROM recursive_cte;
+----
+logical_plan
+Projection: recursive_cte.val
+--SubqueryAlias: recursive_cte
+----RecursiveQuery: is_distinct=false
+------Projection: Int64(1) AS val
+--------EmptyRelation
+------Projection: Int64(2) AS val
+--------CrossJoin:
+----------Filter: recursive_cte.val < Int64(2)
+------------TableScan: recursive_cte
+----------SubqueryAlias: sub_cte
+------------Projection: Int64(2) AS val
+--------------EmptyRelation
+physical_plan
+RecursiveQueryExec: name=recursive_cte, is_distinct=false
+--ProjectionExec: expr=[1 as val]
+----PlaceholderRowExec
+--ProjectionExec: expr=[2 as val]
+----CrossJoinExec
+------CoalescePartitionsExec
+--------CoalesceBatchesExec: target_batch_size=8182
+----------FilterExec: val@0 < 2
+------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+--------------WorkTableExec: name=recursive_cte
+------ProjectionExec: expr=[2 as val]
+--------PlaceholderRowExec