This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4eacb60467 Enable Projection Pushdown Optimization for Recursive CTEs 
(#16696)
4eacb60467 is described below

commit 4eacb6046773b759dae0b3d801fe8cb1c6b65c0f
Author: kosiew <[email protected]>
AuthorDate: Wed Oct 1 11:03:23 2025 +0800

    Enable Projection Pushdown Optimization for Recursive CTEs (#16696)
    
    * Add column pruning support for RecursiveQuery operator in optimizer
    
    - Extend optimize_projections to handle LogicalPlan::RecursiveQuery by 
applying
      projection pushdown to its inputs, improving query performance.
    - Add integration test `recursive_query_column_pruning` verifying plan shows
      correct projection pruning for recursive CTEs.
    - Implement `create_cte_work_table` in test context provider to support CTE 
tests.
    - Add .github/copilot-instructions.md and AGENTS.md docs with Rust coding,
      linting, formatting, and contribution guidelines to maintain quality and
      consistency in generated and user code.
    
    * Amend cte results
    
    * Add recursive_cte.rs integration test and improve recursive CTE 
projection pushdown in cte.slt
    
    - Add a new async test `recursive_cte_alias_instability` covering a complex 
recursive CTE query to the DataFusion core SQL tests, validating recursive CTE 
alias handling and query stability.
    - Enhance existing sqllogictest file `cte.slt` by adding column projections 
to recursive CTE TableScans, improving plan efficiency and consistency.
    - Fix projection pushdown for recursive CTEs in logical and physical plans 
for nodes, balances, recursive_cte, and numbers test cases.
    - This addresses alias instability and projection inefficiencies previously 
observed in recursive CTE handling and improves test coverage for recursive SQL 
features.
    
    * Remove redundant handling of RecursiveQuery in optimize_projections 
function
    
    * main cte.slt
    
    * Add tests for recursive CTE projection pushdown scenarios
    
    * Remove unused create_cte_work_table function from MyContextProvider 
implementation
    
    * Refactor TableScan projections in recursive query tests for clarity
    
    * consolidate recursive cte tests
    
    * Remove test that is included in slt
    
    * Enhance optimization for recursive queries by adding checks for 
problematic structures
    
    * Refactor recursive CTE tests to improve projection pushdown validation 
and add new test for alias instability
    
    * refactor: rename function to clarify purpose of subquery alias detection 
in recursive queries
    
    * test: add comments to clarify purpose of subquery alias handling in 
recursive CTE tests
    
    * Refactor `plan_contains_subquery_alias` to count subquery aliases instead 
of returning a bool
    
    - Changed `plan_contains_subquery_alias` to count occurrences of 
`SubqueryAlias` nodes in the plan.
    - Added helper function `count_subquery_aliases` to recursively update the 
count.
    - Updated logic to return true if there are two or more subquery aliases, 
preserving original behavior in `recursive_cte_alias_instability` test.
    
    * Optimize subquery alias counting by early termination
    
    - Updated `plan_contains_subquery_alias` to short-circuit counting once 
threshold (2) is reached.
    - Modified `count_subquery_aliases` to accept a threshold and return early 
when count meets or exceeds it.
    - Improves performance by avoiding unnecessary traversal of the entire plan.
    - Added integration test snapshot for recursive query column pruning 
reflecting these changes.
    
    * Refactor count_subquery_aliases to return count instead of using mutable 
reference
    
    - Changed count_subquery_aliases to take count usize and return updated 
count, removing mutable reference.
    - Added early-exit when count reaches threshold to avoid unnecessary 
traversal.
    - Updated is_projection_unnecessary to use new count_subquery_aliases 
signature.
    - Removed obsolete integration test snapshot file.
    - Fixed recursive CTE plan and physical plan in optimizer integration and 
sqllogictest to include projection pruning on TableScans inside recursive 
queries.
    
    This improves clarity, readability, and efficiency of subquery alias 
counting in logical plans, and fixes recursive query projections pruning as per 
related issue.
    
    * docs(tests): add note referencing similar SQL in cte.slt for recursive 
CTE alias test
    
    Add a comment in the recursive_cte_alias_instability test highlighting the 
similarity of the SQL query to one in 
datafusion/sqllogictest/test_files/cte.slt. This improves test clarity and 
cross-reference for maintainers.
    
    * fix: clarify comment regarding alias ambiguity in recursive CTEs
    feat: add initial example for explain_memory
    
    * refactor: remove recursive_cte test module
    
    * fix: add comment for clarity on recursive query handling
    
    This commit adds a comment to the `optimize_projections` function in the 
`mod.rs` file to clarify the handling of recursive queries. The comment 
references a discussion on GitHub related to the implementation.
    
    * remove stray file
    
    * Revert "refactor: remove recursive_cte test module"
    
    This reverts commit d8f6dd153de2ffae705a00a09d11c8c20c9bd27b.
    
    * remove RecursiveQuery bypass
    
    * refactor: improve handling of non-CTE subqueries in recursive queries
    
    * refactor: remove unused recursive_cte module from SQL tests
    
    * refactor: enhance projection optimization by handling non-CTE subqueries 
in recursive queries
    
    * refactor: simplify TableScan projections in recursive query tests
    
    * refactor: reorder filter and projection in recursive query logical plan
    
    * refactor: restrict projection pushdown in recursive queries to only allow 
CTE references
    
    * refactor: optimize TableScan projection in recursive query logical plan
    
    * fix: correct dynamic filter predicate order in hash join test
    
    * Update datafusion/optimizer/src/optimize_projections/mod.rs
    
    Co-authored-by: Jeffrey Vo <[email protected]>
    
    * Update datafusion/optimizer/src/optimize_projections/mod.rs
    
    Co-authored-by: Jeffrey Vo <[email protected]>
    
    * Enhance test description for recursive CTE projection pushdown
    
    * Rename plan_contains_non_cte_subquery to plan_contains_other_subqueries 
for clarity
    
    * Add test for recursive CTE with nested subquery to validate projection 
pushdown behavior
    
    * Remove recursive_query_column_pruning
    
    ---------
    
    Co-authored-by: Jeffrey Vo <[email protected]>
---
 .../optimizer/src/optimize_projections/mod.rs      |  65 +++++++++-
 .../optimizer/tests/optimizer_integration.rs       | 138 +++++++++++++++++++++
 datafusion/sqllogictest/test_files/cte.slt         |  18 ++-
 3 files changed, 210 insertions(+), 11 deletions(-)

diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs 
b/datafusion/optimizer/src/optimize_projections/mod.rs
index d6e3f6051f..312e788db7 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -356,12 +356,35 @@ fn optimize_projections(
                 .collect::<Result<Vec<_>>>()?
         }
         LogicalPlan::EmptyRelation(_)
-        | LogicalPlan::RecursiveQuery(_)
         | LogicalPlan::Values(_)
         | LogicalPlan::DescribeTable(_) => {
             // These operators have no inputs, so stop the optimization 
process.
             return Ok(Transformed::no(plan));
         }
+        LogicalPlan::RecursiveQuery(recursive) => {
+            // Only allow subqueries that reference the current CTE; nested 
subqueries are not yet
+            // supported for projection pushdown for simplicity.
+            // TODO: be able to do projection pushdown on recursive CTEs with 
subqueries
+            if plan_contains_other_subqueries(
+                recursive.static_term.as_ref(),
+                &recursive.name,
+            ) || plan_contains_other_subqueries(
+                recursive.recursive_term.as_ref(),
+                &recursive.name,
+            ) {
+                return Ok(Transformed::no(plan));
+            }
+
+            plan.inputs()
+                .into_iter()
+                .map(|input| {
+                    indices
+                        .clone()
+                        .with_projection_beneficial()
+                        .with_plan_exprs(&plan, input.schema())
+                })
+                .collect::<Result<Vec<_>>>()?
+        }
         LogicalPlan::Join(join) => {
             let left_len = join.left.schema().fields().len();
             let (left_req_indices, right_req_indices) =
@@ -850,6 +873,46 @@ pub fn is_projection_unnecessary(
     ))
 }
 
+/// Returns true if the plan subtree contains any subqueries that are not the
+/// CTE reference itself. This treats any non-CTE 
[`LogicalPlan::SubqueryAlias`]
+/// node (including aliased relations) as a blocker, along with 
expression-level
+/// subqueries like scalar, EXISTS, or IN. These cases prevent projection
+/// pushdown for now because we cannot safely reason about their column usage.
+fn plan_contains_other_subqueries(plan: &LogicalPlan, cte_name: &str) -> bool {
+    if let LogicalPlan::SubqueryAlias(alias) = plan {
+        if alias.alias.table() != cte_name {
+            return true;
+        }
+    }
+
+    let mut found = false;
+    plan.apply_expressions(|expr| {
+        if expr_contains_subquery(expr) {
+            found = true;
+            Ok(TreeNodeRecursion::Stop)
+        } else {
+            Ok(TreeNodeRecursion::Continue)
+        }
+    })
+    .expect("expression traversal never fails");
+    if found {
+        return true;
+    }
+
+    plan.inputs()
+        .into_iter()
+        .any(|child| plan_contains_other_subqueries(child, cte_name))
+}
+
+fn expr_contains_subquery(expr: &Expr) -> bool {
+    expr.exists(|e| match e {
+        Expr::ScalarSubquery(_) | Expr::Exists(_) | Expr::InSubquery(_) => 
Ok(true),
+        _ => Ok(false),
+    })
+    // Safe unwrap since we are doing a simple boolean check
+    .unwrap()
+}
+
 #[cfg(test)]
 mod tests {
     use std::cmp::Ordering;
diff --git a/datafusion/optimizer/tests/optimizer_integration.rs 
b/datafusion/optimizer/tests/optimizer_integration.rs
index deda8fadbe..b17375ac01 100644
--- a/datafusion/optimizer/tests/optimizer_integration.rs
+++ b/datafusion/optimizer/tests/optimizer_integration.rs
@@ -46,6 +46,48 @@ fn init() {
     let _ = env_logger::try_init();
 }
 
+#[test]
+fn recursive_cte_with_nested_subquery() -> Result<()> {
+    // Covers bailout path in `plan_contains_other_subqueries`, ensuring 
nested subqueries
+    // within recursive CTE branches prevent projection pushdown.
+    let sql = r#"
+        WITH RECURSIVE numbers(id, level) AS (
+            SELECT sub.id, sub.level FROM (
+                SELECT col_int32 AS id, 1 AS level FROM test
+            ) sub
+            UNION ALL
+            SELECT t.col_int32, numbers.level + 1
+            FROM test t
+            JOIN numbers ON t.col_int32 = numbers.id + 1
+        )
+        SELECT id, level FROM numbers
+    "#;
+
+    let plan = test_sql(sql)?;
+
+    assert_snapshot!(
+        format!("{plan}"),
+        @r#"
+        SubqueryAlias: numbers
+          Projection: sub.id AS id, sub.level AS level
+            RecursiveQuery: is_distinct=false
+              Projection: sub.id, sub.level
+                SubqueryAlias: sub
+                  Projection: test.col_int32 AS id, Int64(1) AS level
+                    TableScan: test
+              Projection: t.col_int32, numbers.level + Int64(1)
+                Inner Join: CAST(t.col_int32 AS Int64) = CAST(numbers.id AS 
Int64) + Int64(1)
+                  SubqueryAlias: t
+                    Filter: CAST(test.col_int32 AS Int64) IS NOT NULL
+                      TableScan: test
+                  Filter: CAST(numbers.id AS Int64) + Int64(1) IS NOT NULL
+                    TableScan: numbers
+        "#
+    );
+
+    Ok(())
+}
+
 #[test]
 fn case_when() -> Result<()> {
     let sql = "SELECT CASE WHEN col_int32 > 0 THEN 1 ELSE 0 END FROM test";
@@ -478,6 +520,94 @@ fn 
select_correlated_predicate_subquery_with_uppercase_ident() {
     );
 }
 
+#[test]
+fn recursive_cte_projection_pushdown() -> Result<()> {
+    // Test that projection pushdown works with recursive CTEs by ensuring
+    // only the required columns are projected from the base table, even when
+    // the CTE definition includes unused columns
+    let sql = "WITH RECURSIVE nodes AS (\
+        SELECT col_int32 AS id, col_utf8 AS name, col_uint32 AS extra FROM 
test \
+        UNION ALL \
+        SELECT id + 1, name, extra FROM nodes WHERE id < 3\
+    ) SELECT id FROM nodes";
+    let plan = test_sql(sql)?;
+
+    // The optimizer successfully performs projection pushdown by only 
selecting the needed
+    // columns from the base table and recursive table, eliminating unused 
columns
+    assert_snapshot!(
+        format!("{plan}"),
+        @r#"SubqueryAlias: nodes
+  RecursiveQuery: is_distinct=false
+    Projection: test.col_int32 AS id
+      TableScan: test projection=[col_int32]
+    Projection: CAST(CAST(nodes.id AS Int64) + Int64(1) AS Int32)
+      Filter: nodes.id < Int32(3)
+        TableScan: nodes projection=[id]
+"#
+    );
+    Ok(())
+}
+
+#[test]
+fn recursive_cte_with_unused_columns() -> Result<()> {
+    // Test projection pushdown with a recursive CTE where the base case
+    // includes columns that are never used in the recursive part or final 
result
+    let sql = "WITH RECURSIVE series AS (\
+        SELECT 1 AS n, col_utf8, col_uint32, col_date32 FROM test WHERE 
col_int32 = 1 \
+        UNION ALL \
+        SELECT n + 1, col_utf8, col_uint32, col_date32 FROM series WHERE n < 3\
+    ) SELECT n FROM series";
+    let plan = test_sql(sql)?;
+
+    // The optimizer successfully performs projection pushdown by eliminating 
unused columns
+    // even when they're defined in the CTE but not actually needed
+    assert_snapshot!(
+        format!("{plan}"),
+        @r#"SubqueryAlias: series
+  RecursiveQuery: is_distinct=false
+    Projection: Int64(1) AS n
+      Filter: test.col_int32 = Int32(1)
+        TableScan: test projection=[col_int32]
+    Projection: series.n + Int64(1)
+      Filter: series.n < Int64(3)
+        TableScan: series projection=[n]
+"#
+    );
+    Ok(())
+}
+
+#[test]
+/// Asserts the minimal plan shape once projection pushdown succeeds for a 
recursive CTE.
+/// Unlike the previous two tests that retain extra columns in either the base 
or recursive
+/// branches, this baseline shows the optimizer trimming everything down to 
the single
+/// column required by the final projection.
+fn recursive_cte_projection_pushdown_baseline() -> Result<()> {
+    // Test case that truly demonstrates projection pushdown working:
+    // The base case only selects needed columns
+    let sql = "WITH RECURSIVE countdown AS (\
+        SELECT col_int32 AS n FROM test WHERE col_int32 = 5 \
+        UNION ALL \
+        SELECT n - 1 FROM countdown WHERE n > 1\
+    ) SELECT n FROM countdown";
+    let plan = test_sql(sql)?;
+
+    // This demonstrates optimal projection pushdown where only col_int32 is 
projected from the base table,
+    // and only the needed column is selected from the recursive table
+    assert_snapshot!(
+        format!("{plan}"),
+        @r#"SubqueryAlias: countdown
+  RecursiveQuery: is_distinct=false
+    Projection: test.col_int32 AS n
+      Filter: test.col_int32 = Int32(5)
+        TableScan: test projection=[col_int32]
+    Projection: CAST(CAST(countdown.n AS Int64) - Int64(1) AS Int32)
+      Filter: countdown.n > Int32(1)
+        TableScan: countdown projection=[n]
+"#
+    );
+    Ok(())
+}
+
 fn test_sql(sql: &str) -> Result<LogicalPlan> {
     // parse the SQL
     let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
@@ -587,6 +717,14 @@ impl ContextProvider for MyContextProvider {
         None
     }
 
+    fn create_cte_work_table(
+        &self,
+        _name: &str,
+        schema: SchemaRef,
+    ) -> Result<Arc<dyn TableSource>> {
+        Ok(Arc::new(MyTableSource { schema }))
+    }
+
     fn options(&self) -> &ConfigOptions {
         &self.options
     }
diff --git a/datafusion/sqllogictest/test_files/cte.slt 
b/datafusion/sqllogictest/test_files/cte.slt
index adbf308a96..a581bcb539 100644
--- a/datafusion/sqllogictest/test_files/cte.slt
+++ b/datafusion/sqllogictest/test_files/cte.slt
@@ -110,7 +110,7 @@ logical_plan
 04)------EmptyRelation: rows=1
 05)----Projection: nodes.id + Int64(1) AS id
 06)------Filter: nodes.id < Int64(10)
-07)--------TableScan: nodes
+07)--------TableScan: nodes projection=[id]
 physical_plan
 01)RecursiveQueryExec: name=nodes, is_distinct=false
 02)--ProjectionExec: expr=[1 as id]
@@ -152,11 +152,10 @@ logical_plan
 01)Sort: balances.time ASC NULLS LAST, balances.name ASC NULLS LAST, 
balances.account_balance ASC NULLS LAST
 02)--SubqueryAlias: balances
 03)----RecursiveQuery: is_distinct=false
-04)------Projection: balance.time, balance.name, balance.account_balance
-05)--------TableScan: balance
-06)------Projection: balances.time + Int64(1) AS time, balances.name, 
balances.account_balance + Int64(10) AS account_balance
-07)--------Filter: balances.time < Int64(10)
-08)----------TableScan: balances
+04)------TableScan: balance projection=[time, name, account_balance]
+05)------Projection: balances.time + Int64(1) AS time, balances.name, 
balances.account_balance + Int64(10) AS account_balance
+06)--------Filter: balances.time < Int64(10)
+07)----------TableScan: balances projection=[time, name, account_balance]
 physical_plan
 01)SortExec: expr=[time@0 ASC NULLS LAST, name@1 ASC NULLS LAST, 
account_balance@2 ASC NULLS LAST], preserve_partitioning=[false]
 02)--RecursiveQueryExec: name=balances, is_distinct=false
@@ -958,7 +957,7 @@ logical_plan
 04)------EmptyRelation: rows=1
 05)----Projection: numbers.n + Int64(1)
 06)------Filter: numbers.n < Int64(10)
-07)--------TableScan: numbers
+07)--------TableScan: numbers projection=[n]
 physical_plan
 01)RecursiveQueryExec: name=numbers, is_distinct=false
 02)--ProjectionExec: expr=[1 as n]
@@ -984,7 +983,7 @@ logical_plan
 04)------EmptyRelation: rows=1
 05)----Projection: numbers.n + Int64(1)
 06)------Filter: numbers.n < Int64(10)
-07)--------TableScan: numbers
+07)--------TableScan: numbers projection=[n]
 physical_plan
 01)RecursiveQueryExec: name=numbers, is_distinct=false
 02)--ProjectionExec: expr=[1 as n]
@@ -1041,8 +1040,7 @@ logical_plan
 04)------Projection: Int64(0) AS k, Int64(0) AS v
 05)--------EmptyRelation: rows=1
 06)------Sort: r.v ASC NULLS LAST, fetch=1
-07)--------Projection: r.k, r.v
-08)----------TableScan: r
+07)--------TableScan: r projection=[k, v]
 physical_plan
 01)GlobalLimitExec: skip=0, fetch=5
 02)--RecursiveQueryExec: name=r, is_distinct=false


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to