This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4eacb60467 Enable Projection Pushdown Optimization for Recursive CTEs
(#16696)
4eacb60467 is described below
commit 4eacb6046773b759dae0b3d801fe8cb1c6b65c0f
Author: kosiew <[email protected]>
AuthorDate: Wed Oct 1 11:03:23 2025 +0800
Enable Projection Pushdown Optimization for Recursive CTEs (#16696)
* Add column pruning support for RecursiveQuery operator in optimizer
- Extend optimize_projections to handle LogicalPlan::RecursiveQuery by
applying
projection pushdown to its inputs, improving query performance.
- Add integration test `recursive_query_column_pruning` verifying plan shows
correct projection pruning for recursive CTEs.
- Implement `create_cte_work_table` in test context provider to support CTE
tests.
- Add .github/copilot-instructions.md and AGENTS.md docs with Rust coding,
linting, formatting, and contribution guidelines to maintain quality and
consistency in generated and user code.
* Amend cte results
* Add recursive_cte.rs integration test and improve recursive CTE
projection pushdown in cte.slt
- Add a new async test `recursive_cte_alias_instability` covering a complex
recursive CTE query to the DataFusion core SQL tests, validating recursive CTE
alias handling and query stability.
- Enhance existing sqllogictest file `cte.slt` by adding column projections
to recursive CTE TableScans, improving plan efficiency and consistency.
- Fix projection pushdown for recursive CTEs in logical and physical plans
for nodes, balances, recursive_cte, and numbers test cases.
- This addresses alias instability and projection inefficiencies previously
observed in recursive CTE handling and improves test coverage for recursive SQL
features.
* Remove redundant handling of RecursiveQuery in optimize_projections
function
* main cte.slt
* Add tests for recursive CTE projection pushdown scenarios
* Remove unused create_cte_work_table function from MyContextProvider
implementation
* Refactor TableScan projections in recursive query tests for clarity
* consolidate recursive cte tests
* Remove test that is included in slt
* Enhance optimization for recursive queries by adding checks for
problematic structures
* Refactor recursive CTE tests to improve projection pushdown validation
and add new test for alias instability
* refactor: rename function to clarify purpose of subquery alias detection
in recursive queries
* test: add comments to clarify purpose of subquery alias handling in
recursive CTE tests
* Refactor `plan_contains_subquery_alias` to count subquery aliases instead
of returning a bool
- Changed `plan_contains_subquery_alias` to count occurrences of
`SubqueryAlias` nodes in the plan.
- Added helper function `count_subquery_aliases` to recursively update the
count.
- Updated logic to return true if there are two or more subquery aliases,
preserving original behavior in `recursive_cte_alias_instability` test.
* Optimize subquery alias counting by early termination
- Updated `plan_contains_subquery_alias` to short-circuit counting once
threshold (2) is reached.
- Modified `count_subquery_aliases` to accept a threshold and return early
when count meets or exceeds it.
- Improves performance by avoiding unnecessary traversal of the entire plan.
- Added integration test snapshot for recursive query column pruning
reflecting these changes.
* Refactor count_subquery_aliases to return count instead of using mutable
reference
- Changed count_subquery_aliases to take count usize and return updated
count, removing mutable reference.
- Added early-exit when count reaches threshold to avoid unnecessary
traversal.
- Updated is_projection_unnecessary to use new count_subquery_aliases
signature.
- Removed obsolete integration test snapshot file.
- Fixed recursive CTE plan and physical plan in optimizer integration and
sqllogictest to include projection pruning on TableScans inside recursive
queries.
This improves clarity, readability, and efficiency of subquery alias
counting in logical plans, and fixes recursive query projections pruning as per
related issue.
* docs(tests): add note referencing similar SQL in cte.slt for recursive
CTE alias test
Add a comment in the recursive_cte_alias_instability test highlighting the
similarity of the SQL query to one in
datafusion/sqllogictest/test_files/cte.slt. This improves test clarity and
cross-reference for maintainers.
* fix: clarify comment regarding alias ambiguity in recursive CTEs
feat: add initial example for explain_memory
* refactor: remove recursive_cte test module
* fix: add comment for clarity on recursive query handling
This commit adds a comment to the `optimize_projections` function in the
`mod.rs` file to clarify the handling of recursive queries. The comment
references a discussion on GitHub related to the implementation.
* remove stray file
* Revert "refactor: remove recursive_cte test module"
This reverts commit d8f6dd153de2ffae705a00a09d11c8c20c9bd27b.
* remove RecursiveQuery bypass
* refactor: improve handling of non-CTE subqueries in recursive queries
* refactor: remove unused recursive_cte module from SQL tests
* refactor: enhance projection optimization by handling non-CTE subqueries
in recursive queries
* refactor: simplify TableScan projections in recursive query tests
* refactor: reorder filter and projection in recursive query logical plan
* refactor: restrict projection pushdown in recursive queries to only allow
CTE references
* refactor: optimize TableScan projection in recursive query logical plan
* fix: correct dynamic filter predicate order in hash join test
* Update datafusion/optimizer/src/optimize_projections/mod.rs
Co-authored-by: Jeffrey Vo <[email protected]>
* Update datafusion/optimizer/src/optimize_projections/mod.rs
Co-authored-by: Jeffrey Vo <[email protected]>
* Enhance test description for recursive CTE projection pushdown
* Rename plan_contains_non_cte_subquery to plan_contains_other_subqueries
for clarity
* Add test for recursive CTE with nested subquery to validate projection
pushdown behavior
* Remove recursive_query_column_pruning
---------
Co-authored-by: Jeffrey Vo <[email protected]>
---
.../optimizer/src/optimize_projections/mod.rs | 65 +++++++++-
.../optimizer/tests/optimizer_integration.rs | 138 +++++++++++++++++++++
datafusion/sqllogictest/test_files/cte.slt | 18 ++-
3 files changed, 210 insertions(+), 11 deletions(-)
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs
b/datafusion/optimizer/src/optimize_projections/mod.rs
index d6e3f6051f..312e788db7 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -356,12 +356,35 @@ fn optimize_projections(
.collect::<Result<Vec<_>>>()?
}
LogicalPlan::EmptyRelation(_)
- | LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Values(_)
| LogicalPlan::DescribeTable(_) => {
// These operators have no inputs, so stop the optimization
process.
return Ok(Transformed::no(plan));
}
+ LogicalPlan::RecursiveQuery(recursive) => {
+ // Only allow subqueries that reference the current CTE; nested
subqueries are not yet
+ // supported for projection pushdown for simplicity.
+ // TODO: be able to do projection pushdown on recursive CTEs with
subqueries
+ if plan_contains_other_subqueries(
+ recursive.static_term.as_ref(),
+ &recursive.name,
+ ) || plan_contains_other_subqueries(
+ recursive.recursive_term.as_ref(),
+ &recursive.name,
+ ) {
+ return Ok(Transformed::no(plan));
+ }
+
+ plan.inputs()
+ .into_iter()
+ .map(|input| {
+ indices
+ .clone()
+ .with_projection_beneficial()
+ .with_plan_exprs(&plan, input.schema())
+ })
+ .collect::<Result<Vec<_>>>()?
+ }
LogicalPlan::Join(join) => {
let left_len = join.left.schema().fields().len();
let (left_req_indices, right_req_indices) =
@@ -850,6 +873,46 @@ pub fn is_projection_unnecessary(
))
}
+/// Returns true if the plan subtree contains any subqueries that are not the
+/// CTE reference itself. This treats any non-CTE
[`LogicalPlan::SubqueryAlias`]
+/// node (including aliased relations) as a blocker, along with
expression-level
+/// subqueries like scalar, EXISTS, or IN. These cases prevent projection
+/// pushdown for now because we cannot safely reason about their column usage.
+fn plan_contains_other_subqueries(plan: &LogicalPlan, cte_name: &str) -> bool {
+ if let LogicalPlan::SubqueryAlias(alias) = plan {
+ if alias.alias.table() != cte_name {
+ return true;
+ }
+ }
+
+ let mut found = false;
+ plan.apply_expressions(|expr| {
+ if expr_contains_subquery(expr) {
+ found = true;
+ Ok(TreeNodeRecursion::Stop)
+ } else {
+ Ok(TreeNodeRecursion::Continue)
+ }
+ })
+ .expect("expression traversal never fails");
+ if found {
+ return true;
+ }
+
+ plan.inputs()
+ .into_iter()
+ .any(|child| plan_contains_other_subqueries(child, cte_name))
+}
+
+fn expr_contains_subquery(expr: &Expr) -> bool {
+ expr.exists(|e| match e {
+ Expr::ScalarSubquery(_) | Expr::Exists(_) | Expr::InSubquery(_) =>
Ok(true),
+ _ => Ok(false),
+ })
+ // Safe unwrap since we are doing a simple boolean check
+ .unwrap()
+}
+
#[cfg(test)]
mod tests {
use std::cmp::Ordering;
diff --git a/datafusion/optimizer/tests/optimizer_integration.rs
b/datafusion/optimizer/tests/optimizer_integration.rs
index deda8fadbe..b17375ac01 100644
--- a/datafusion/optimizer/tests/optimizer_integration.rs
+++ b/datafusion/optimizer/tests/optimizer_integration.rs
@@ -46,6 +46,48 @@ fn init() {
let _ = env_logger::try_init();
}
+#[test]
+fn recursive_cte_with_nested_subquery() -> Result<()> {
+ // Covers bailout path in `plan_contains_other_subqueries`, ensuring
nested subqueries
+ // within recursive CTE branches prevent projection pushdown.
+ let sql = r#"
+ WITH RECURSIVE numbers(id, level) AS (
+ SELECT sub.id, sub.level FROM (
+ SELECT col_int32 AS id, 1 AS level FROM test
+ ) sub
+ UNION ALL
+ SELECT t.col_int32, numbers.level + 1
+ FROM test t
+ JOIN numbers ON t.col_int32 = numbers.id + 1
+ )
+ SELECT id, level FROM numbers
+ "#;
+
+ let plan = test_sql(sql)?;
+
+ assert_snapshot!(
+ format!("{plan}"),
+ @r#"
+ SubqueryAlias: numbers
+ Projection: sub.id AS id, sub.level AS level
+ RecursiveQuery: is_distinct=false
+ Projection: sub.id, sub.level
+ SubqueryAlias: sub
+ Projection: test.col_int32 AS id, Int64(1) AS level
+ TableScan: test
+ Projection: t.col_int32, numbers.level + Int64(1)
+ Inner Join: CAST(t.col_int32 AS Int64) = CAST(numbers.id AS
Int64) + Int64(1)
+ SubqueryAlias: t
+ Filter: CAST(test.col_int32 AS Int64) IS NOT NULL
+ TableScan: test
+ Filter: CAST(numbers.id AS Int64) + Int64(1) IS NOT NULL
+ TableScan: numbers
+ "#
+ );
+
+ Ok(())
+}
+
#[test]
fn case_when() -> Result<()> {
let sql = "SELECT CASE WHEN col_int32 > 0 THEN 1 ELSE 0 END FROM test";
@@ -478,6 +520,94 @@ fn
select_correlated_predicate_subquery_with_uppercase_ident() {
);
}
+#[test]
+fn recursive_cte_projection_pushdown() -> Result<()> {
+ // Test that projection pushdown works with recursive CTEs by ensuring
+ // only the required columns are projected from the base table, even when
+ // the CTE definition includes unused columns
+ let sql = "WITH RECURSIVE nodes AS (\
+ SELECT col_int32 AS id, col_utf8 AS name, col_uint32 AS extra FROM
test \
+ UNION ALL \
+ SELECT id + 1, name, extra FROM nodes WHERE id < 3\
+ ) SELECT id FROM nodes";
+ let plan = test_sql(sql)?;
+
+ // The optimizer successfully performs projection pushdown by only
selecting the needed
+ // columns from the base table and recursive table, eliminating unused
columns
+ assert_snapshot!(
+ format!("{plan}"),
+ @r#"SubqueryAlias: nodes
+ RecursiveQuery: is_distinct=false
+ Projection: test.col_int32 AS id
+ TableScan: test projection=[col_int32]
+ Projection: CAST(CAST(nodes.id AS Int64) + Int64(1) AS Int32)
+ Filter: nodes.id < Int32(3)
+ TableScan: nodes projection=[id]
+"#
+ );
+ Ok(())
+}
+
+#[test]
+fn recursive_cte_with_unused_columns() -> Result<()> {
+ // Test projection pushdown with a recursive CTE where the base case
+ // includes columns that are never used in the recursive part or final
result
+ let sql = "WITH RECURSIVE series AS (\
+ SELECT 1 AS n, col_utf8, col_uint32, col_date32 FROM test WHERE
col_int32 = 1 \
+ UNION ALL \
+ SELECT n + 1, col_utf8, col_uint32, col_date32 FROM series WHERE n < 3\
+ ) SELECT n FROM series";
+ let plan = test_sql(sql)?;
+
+ // The optimizer successfully performs projection pushdown by eliminating
unused columns
+ // even when they're defined in the CTE but not actually needed
+ assert_snapshot!(
+ format!("{plan}"),
+ @r#"SubqueryAlias: series
+ RecursiveQuery: is_distinct=false
+ Projection: Int64(1) AS n
+ Filter: test.col_int32 = Int32(1)
+ TableScan: test projection=[col_int32]
+ Projection: series.n + Int64(1)
+ Filter: series.n < Int64(3)
+ TableScan: series projection=[n]
+"#
+ );
+ Ok(())
+}
+
+#[test]
+/// Asserts the minimal plan shape once projection pushdown succeeds for a
recursive CTE.
+/// Unlike the previous two tests that retain extra columns in either the base
or recursive
+/// branches, this baseline shows the optimizer trimming everything down to
the single
+/// column required by the final projection.
+fn recursive_cte_projection_pushdown_baseline() -> Result<()> {
+ // Test case that truly demonstrates projection pushdown working:
+ // The base case only selects needed columns
+ let sql = "WITH RECURSIVE countdown AS (\
+ SELECT col_int32 AS n FROM test WHERE col_int32 = 5 \
+ UNION ALL \
+ SELECT n - 1 FROM countdown WHERE n > 1\
+ ) SELECT n FROM countdown";
+ let plan = test_sql(sql)?;
+
+ // This demonstrates optimal projection pushdown where only col_int32 is
projected from the base table,
+ // and only the needed column is selected from the recursive table
+ assert_snapshot!(
+ format!("{plan}"),
+ @r#"SubqueryAlias: countdown
+ RecursiveQuery: is_distinct=false
+ Projection: test.col_int32 AS n
+ Filter: test.col_int32 = Int32(5)
+ TableScan: test projection=[col_int32]
+ Projection: CAST(CAST(countdown.n AS Int64) - Int64(1) AS Int32)
+ Filter: countdown.n > Int32(1)
+ TableScan: countdown projection=[n]
+"#
+ );
+ Ok(())
+}
+
fn test_sql(sql: &str) -> Result<LogicalPlan> {
// parse the SQL
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
@@ -587,6 +717,14 @@ impl ContextProvider for MyContextProvider {
None
}
+ fn create_cte_work_table(
+ &self,
+ _name: &str,
+ schema: SchemaRef,
+ ) -> Result<Arc<dyn TableSource>> {
+ Ok(Arc::new(MyTableSource { schema }))
+ }
+
fn options(&self) -> &ConfigOptions {
&self.options
}
diff --git a/datafusion/sqllogictest/test_files/cte.slt
b/datafusion/sqllogictest/test_files/cte.slt
index adbf308a96..a581bcb539 100644
--- a/datafusion/sqllogictest/test_files/cte.slt
+++ b/datafusion/sqllogictest/test_files/cte.slt
@@ -110,7 +110,7 @@ logical_plan
04)------EmptyRelation: rows=1
05)----Projection: nodes.id + Int64(1) AS id
06)------Filter: nodes.id < Int64(10)
-07)--------TableScan: nodes
+07)--------TableScan: nodes projection=[id]
physical_plan
01)RecursiveQueryExec: name=nodes, is_distinct=false
02)--ProjectionExec: expr=[1 as id]
@@ -152,11 +152,10 @@ logical_plan
01)Sort: balances.time ASC NULLS LAST, balances.name ASC NULLS LAST,
balances.account_balance ASC NULLS LAST
02)--SubqueryAlias: balances
03)----RecursiveQuery: is_distinct=false
-04)------Projection: balance.time, balance.name, balance.account_balance
-05)--------TableScan: balance
-06)------Projection: balances.time + Int64(1) AS time, balances.name,
balances.account_balance + Int64(10) AS account_balance
-07)--------Filter: balances.time < Int64(10)
-08)----------TableScan: balances
+04)------TableScan: balance projection=[time, name, account_balance]
+05)------Projection: balances.time + Int64(1) AS time, balances.name,
balances.account_balance + Int64(10) AS account_balance
+06)--------Filter: balances.time < Int64(10)
+07)----------TableScan: balances projection=[time, name, account_balance]
physical_plan
01)SortExec: expr=[time@0 ASC NULLS LAST, name@1 ASC NULLS LAST,
account_balance@2 ASC NULLS LAST], preserve_partitioning=[false]
02)--RecursiveQueryExec: name=balances, is_distinct=false
@@ -958,7 +957,7 @@ logical_plan
04)------EmptyRelation: rows=1
05)----Projection: numbers.n + Int64(1)
06)------Filter: numbers.n < Int64(10)
-07)--------TableScan: numbers
+07)--------TableScan: numbers projection=[n]
physical_plan
01)RecursiveQueryExec: name=numbers, is_distinct=false
02)--ProjectionExec: expr=[1 as n]
@@ -984,7 +983,7 @@ logical_plan
04)------EmptyRelation: rows=1
05)----Projection: numbers.n + Int64(1)
06)------Filter: numbers.n < Int64(10)
-07)--------TableScan: numbers
+07)--------TableScan: numbers projection=[n]
physical_plan
01)RecursiveQueryExec: name=numbers, is_distinct=false
02)--ProjectionExec: expr=[1 as n]
@@ -1041,8 +1040,7 @@ logical_plan
04)------Projection: Int64(0) AS k, Int64(0) AS v
05)--------EmptyRelation: rows=1
06)------Sort: r.v ASC NULLS LAST, fetch=1
-07)--------Projection: r.k, r.v
-08)----------TableScan: r
+07)--------TableScan: r projection=[k, v]
physical_plan
01)GlobalLimitExec: skip=0, fetch=5
02)--RecursiveQueryExec: name=r, is_distinct=false
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]