This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 76904e837d optimizer: allow projection pushdown through aliased
recursive CTE references (#17875)
76904e837d is described below
commit 76904e837d2e47b56d3fe576ee9f7212064368d9
Author: kosiew <[email protected]>
AuthorDate: Sat Oct 4 17:58:20 2025 +0800
optimizer: allow projection pushdown through aliased recursive CTE
references (#17875)
* Add recursive CTE handling for aliased self-references in optimizer
* Add recursive projection pushdown test for Parquet
* Add comment for parquet recursive projection pushdown
* Update datafusion/core/tests/sql/explain_analyze.rs
Co-authored-by: Jeffrey Vo <[email protected]>
* Refactor parquet_recursive_projection_pushdown test to return Result and
improve SQL formatting
* Fix snapshot assertion in parquet_recursive_projection_pushdown test to
use correct temporary file path
* Enhance parquet_recursive_projection_pushdown test to normalize temporary
directory paths in snapshots
* Fix fmt errors
* Fix clippy error
---------
Co-authored-by: Jeffrey Vo <[email protected]>
---
datafusion/core/tests/sql/explain_analyze.rs | 125 +++++++++++++++++++++
.../optimizer/src/optimize_projections/mod.rs | 21 +++-
.../optimizer/tests/optimizer_integration.rs | 23 ++++
3 files changed, 168 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index 852b350b27..e082cabaad 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -22,6 +22,7 @@ use rstest::rstest;
use datafusion::config::ConfigOptions;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::metrics::Timestamp;
+use object_store::path::Path;
#[tokio::test]
async fn explain_analyze_baseline_metrics() {
@@ -727,6 +728,130 @@ async fn parquet_explain_analyze() {
assert_contains!(&formatted, "row_groups_pruned_statistics=0");
}
+// This test reproduces the behavior described in
+// https://github.com/apache/datafusion/issues/16684 where projection
+// pushdown with recursive CTEs could fail to remove unused columns
+// (e.g. nested/recursive expansion causing full schema to be scanned).
+// Keeping this test ensures we don't regress that behavior.
+#[tokio::test]
+#[cfg_attr(tarpaulin, ignore)]
+async fn parquet_recursive_projection_pushdown() -> Result<()> {
+ use parquet::arrow::arrow_writer::ArrowWriter;
+ use parquet::file::properties::WriterProperties;
+
+ let temp_dir = TempDir::new().unwrap();
+ let parquet_path = temp_dir.path().join("hierarchy.parquet");
+
+ let ids = Int64Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+ let parent_ids = Int64Array::from(vec![0, 1, 1, 2, 2, 3, 4, 5, 6, 7]);
+ let values = Int64Array::from(vec![10, 20, 30, 40, 50, 60, 70, 80, 90,
100]);
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("parent_id", DataType::Int64, true),
+ Field::new("value", DataType::Int64, false),
+ ]));
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(ids), Arc::new(parent_ids), Arc::new(values)],
+ )
+ .unwrap();
+
+ let file = File::create(&parquet_path).unwrap();
+ let props = WriterProperties::builder().build();
+ let mut writer = ArrowWriter::try_new(file, schema, Some(props)).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let ctx = SessionContext::new();
+ ctx.register_parquet(
+ "hierarchy",
+ parquet_path.to_str().unwrap(),
+ ParquetReadOptions::default(),
+ )
+ .await?;
+
+ let sql = r#"
+ WITH RECURSIVE number_series AS (
+ SELECT id, 1 as level
+ FROM hierarchy
+ WHERE id = 1
+
+ UNION ALL
+
+ SELECT ns.id + 1, ns.level + 1
+ FROM number_series ns
+ WHERE ns.id < 10
+ )
+ SELECT * FROM number_series ORDER BY id
+ "#;
+
+ let dataframe = ctx.sql(sql).await?;
+ let physical_plan = dataframe.create_physical_plan().await?;
+
+ let normalizer = ExplainNormalizer::new();
+ let mut actual = format!("{}",
displayable(physical_plan.as_ref()).indent(true))
+ .trim()
+ .lines()
+ .map(|line| normalizer.normalize(line))
+ .collect::<Vec<_>>()
+ .join("\n");
+
+ fn replace_path_variants(actual: &mut String, path: &str) {
+ let mut candidates = vec![path.to_string()];
+
+ let trimmed = path.trim_start_matches(std::path::MAIN_SEPARATOR);
+ if trimmed != path {
+ candidates.push(trimmed.to_string());
+ }
+
+ let forward_slash = path.replace('\\', "/");
+ if forward_slash != path {
+ candidates.push(forward_slash.clone());
+
+ let trimmed_forward = forward_slash.trim_start_matches('/');
+ if trimmed_forward != forward_slash {
+ candidates.push(trimmed_forward.to_string());
+ }
+ }
+
+ for candidate in candidates {
+ *actual = actual.replace(&candidate, "TMP_DIR");
+ }
+ }
+
+ let temp_dir_path = temp_dir.path();
+ let fs_path = temp_dir_path.to_string_lossy().to_string();
+ replace_path_variants(&mut actual, &fs_path);
+
+ if let Ok(url_path) = Path::from_filesystem_path(temp_dir_path) {
+ replace_path_variants(&mut actual, url_path.as_ref());
+ }
+
+ assert_snapshot!(
+ actual,
+ @r"
+ SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+ RecursiveQueryExec: name=number_series, is_distinct=false
+ CoalescePartitionsExec
+ ProjectionExec: expr=[id@0 as id, 1 as level]
+ CoalesceBatchesExec: target_batch_size=8192
+ FilterExec: id@0 = 1
+ RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES),
input_partitions=1
+ DataSourceExec: file_groups={1 group:
[[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet,
predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND
id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]
+ CoalescePartitionsExec
+ ProjectionExec: expr=[id@0 + 1 as ns.id + Int64(1), level@1 + 1 as
ns.level + Int64(1)]
+ CoalesceBatchesExec: target_batch_size=8192
+ FilterExec: id@0 < 10
+ RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES),
input_partitions=1
+ WorkTableExec: name=number_series
+ "
+ );
+
+ Ok(())
+}
+
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn parquet_explain_analyze_verbose() {
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs
b/datafusion/optimizer/src/optimize_projections/mod.rs
index 312e788db7..5db71417bc 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -880,7 +880,9 @@ pub fn is_projection_unnecessary(
/// pushdown for now because we cannot safely reason about their column usage.
fn plan_contains_other_subqueries(plan: &LogicalPlan, cte_name: &str) -> bool {
if let LogicalPlan::SubqueryAlias(alias) = plan {
- if alias.alias.table() != cte_name {
+ if alias.alias.table() != cte_name
+ && !subquery_alias_targets_recursive_cte(alias.input.as_ref(),
cte_name)
+ {
return true;
}
}
@@ -913,6 +915,23 @@ fn expr_contains_subquery(expr: &Expr) -> bool {
.unwrap()
}
+fn subquery_alias_targets_recursive_cte(plan: &LogicalPlan, cte_name: &str) ->
bool {
+ match plan {
+ LogicalPlan::TableScan(scan) => scan.table_name.table() == cte_name,
+ LogicalPlan::SubqueryAlias(alias) => {
+ subquery_alias_targets_recursive_cte(alias.input.as_ref(),
cte_name)
+ }
+ _ => {
+ let inputs = plan.inputs();
+ if inputs.len() == 1 {
+ subquery_alias_targets_recursive_cte(inputs[0], cte_name)
+ } else {
+ false
+ }
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use std::cmp::Ordering;
diff --git a/datafusion/optimizer/tests/optimizer_integration.rs
b/datafusion/optimizer/tests/optimizer_integration.rs
index b17375ac01..c0f48b8ebf 100644
--- a/datafusion/optimizer/tests/optimizer_integration.rs
+++ b/datafusion/optimizer/tests/optimizer_integration.rs
@@ -548,6 +548,29 @@ fn recursive_cte_projection_pushdown() -> Result<()> {
Ok(())
}
+#[test]
+fn recursive_cte_with_aliased_self_reference() -> Result<()> {
+ let sql = "WITH RECURSIVE nodes AS (\
+ SELECT col_int32 AS id, col_utf8 AS name FROM test \
+ UNION ALL \
+ SELECT child.id + 1, child.name FROM nodes AS child WHERE child.id < 3\
+ ) SELECT id FROM nodes";
+ let plan = test_sql(sql)?;
+
+ assert_snapshot!(
+ format!("{plan}"),
+ @r#"SubqueryAlias: nodes
+ RecursiveQuery: is_distinct=false
+ Projection: test.col_int32 AS id
+ TableScan: test projection=[col_int32]
+ Projection: CAST(CAST(child.id AS Int64) + Int64(1) AS Int32)
+ SubqueryAlias: child
+ Filter: nodes.id < Int32(3)
+ TableScan: nodes projection=[id]"#,
+ );
+ Ok(())
+}
+
#[test]
fn recursive_cte_with_unused_columns() -> Result<()> {
// Test projection pushdown with a recursive CTE where the base case
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]