This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a3a020f6b0 Fix Schema Duplication Errors in Self‑Referential 
INTERSECT/EXCEPT by Requalifying Input Sides (#18814)
a3a020f6b0 is described below

commit a3a020f6b0be3813a066fd3614e9dc010ec48671
Author: kosiew <[email protected]>
AuthorDate: Fri Dec 5 11:37:37 2025 +0800

    Fix Schema Duplication Errors in Self‑Referential INTERSECT/EXCEPT by 
Requalifying Input Sides (#18814)
    
    ## Which issue does this PR close?
    
    * Closes #16295.
    
    ## Rationale for this change
    
    Self-referential INTERSECT and EXCEPT queries (where both sides
    originate from the same table) failed during Substrait round‑trip
    consumption with the error:
    
    > "Schema contains duplicate qualified field name"
    
    This happened because the join-based implementation of set operations
    attempted to merge two identical schemas without requalification,
    resulting in duplicate or ambiguous field names. By ensuring both sides
    are requalified when needed, DataFusion can correctly construct valid
    logical plans for these operations.
    
    ### Before
    ```
    ❯ cargo test --test sqllogictests -- --substrait-round-trip 
intersection.slt:33
        Finished `test` profile [unoptimized + debuginfo] target(s) in 0.24s
         Running bin/sqllogictests.rs 
(target/debug/deps/sqllogictests-917e139464eeea33)
    Completed 1 test files in 0 seconds                                         
     External error: 1 errors in file 
/Users/kosiew/GitHub/datafusion/datafusion/sqllogictest/test_files/intersection.slt
    
    1. query failed: DataFusion error: Schema error: Schema contains duplicate 
qualified field name alltypes_plain.int_col
    ...
    ```
    
    ### After
    ```
    ❯ cargo test --test sqllogictests -- --substrait-round-trip 
intersection.slt:33
        Finished `test` profile [unoptimized + debuginfo] target(s) in 0.64s
         Running bin/sqllogictests.rs 
(target/debug/deps/sqllogictests-917e139464eeea33)
    Completed 1 test files in 0 seconds
    ```
    
    ## What changes are included in this PR?
    
    * Added a requalification step (`requalify_sides_if_needed`) inside
    `intersect_or_except` to avoid duplicate or ambiguous field names.
    * Improved conflict detection logic in `requalify_sides_if_needed` to
    handle:
    
      1. Duplicate qualified fields
      2. Duplicate unqualified fields
      3. Ambiguous references (qualified vs. unqualified collisions)
    * Updated optimizer tests to reflect correct aliasing (`left`, `right`).
    * Added new Substrait round‑trip tests for:
    
      * INTERSECT and EXCEPT (both DISTINCT and ALL variants)
      * Self-referential queries that previously failed
    * Minor formatting and consistency improvements in Substrait consumer
    code.
    
    ## Are these changes tested?
    
    Yes. The PR includes comprehensive tests that:
    
    * Reproduce the original failure modes.
    * Validate that requalification produces stable and correct logical
    plans.
    * Confirm correct behavior across INTERSECT, EXCEPT, ALL, and DISTINCT
    cases.
    
    ## Are there any user-facing changes?
    
    No user-facing behavior changes.
    This is a correctness improvement ensuring that valid SQL
    queries—previously failing only in Substrait round‑trip mode—now work
    without error.
    
    ## LLM-generated code disclosure
    
    This PR includes LLM-generated code and comments. All LLM-generated
    content has been manually reviewed and validated.
---
 datafusion/expr/src/logical_plan/builder.rs        | 77 ++++++++++++++----
 .../optimizer/tests/optimizer_integration.rs       | 12 +--
 .../src/logical_plan/consumer/rel/set_rel.rs       |  5 +-
 .../tests/cases/roundtrip_logical_plan.rs          | 90 ++++++++++++++++++++++
 4 files changed, 162 insertions(+), 22 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 835f99c274..6f654428e4 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1352,6 +1352,15 @@ impl LogicalPlanBuilder {
             );
         }
 
+        // Requalify sides if needed to avoid duplicate qualified field names
+        // (e.g., when both sides reference the same table)
+        let left_builder = LogicalPlanBuilder::from(left_plan);
+        let right_builder = LogicalPlanBuilder::from(right_plan);
+        let (left_builder, right_builder, _requalified) =
+            requalify_sides_if_needed(left_builder, right_builder)?;
+        let left_plan = left_builder.build()?;
+        let right_plan = right_builder.build()?;
+
         let join_keys = left_plan
             .schema()
             .fields()
@@ -1731,23 +1740,61 @@ pub fn requalify_sides_if_needed(
 ) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
     let left_cols = left.schema().columns();
     let right_cols = right.schema().columns();
-    if left_cols.iter().any(|l| {
-        right_cols.iter().any(|r| {
-            l == r || (l.name == r.name && (l.relation.is_none() || 
r.relation.is_none()))
-        })
-    }) {
-        // These names have no connection to the original plan, but they'll 
make the columns
-        // (mostly) unique.
-        Ok((
-            left.alias(TableReference::bare("left"))?,
-            right.alias(TableReference::bare("right"))?,
-            true,
-        ))
-    } else {
-        Ok((left, right, false))
+
+    // Requalify if merging the schemas would cause an error during join.
+    // This can happen in several cases:
+    // 1. Duplicate qualified fields: both sides have same relation.name
+    // 2. Duplicate unqualified fields: both sides have same unqualified name
+    // 3. Ambiguous reference: one side qualified, other unqualified, same name
+    //
+    // Implementation note: This uses a simple O(n*m) nested loop rather than
+    // a HashMap-based O(n+m) approach. The nested loop is preferred because:
+    // - Schemas are typically small (in TPCH benchmark, max is 16 columns),
+    //   so n*m is negligible
+    // - Early return on first conflict makes common case very fast
+    // - Code is simpler and easier to reason about
+    // - Called only during plan construction, not in execution hot path
+    for l in &left_cols {
+        for r in &right_cols {
+            if l.name != r.name {
+                continue;
+            }
+
+            // Same name - check if this would cause a conflict
+            match (&l.relation, &r.relation) {
+                // Both qualified with same relation - duplicate qualified 
field
+                (Some(l_rel), Some(r_rel)) if l_rel == r_rel => {
+                    return Ok((
+                        left.alias(TableReference::bare("left"))?,
+                        right.alias(TableReference::bare("right"))?,
+                        true,
+                    ));
+                }
+                // Both unqualified - duplicate unqualified field
+                (None, None) => {
+                    return Ok((
+                        left.alias(TableReference::bare("left"))?,
+                        right.alias(TableReference::bare("right"))?,
+                        true,
+                    ));
+                }
+                // One qualified, one not - ambiguous reference
+                (Some(_), None) | (None, Some(_)) => {
+                    return Ok((
+                        left.alias(TableReference::bare("left"))?,
+                        right.alias(TableReference::bare("right"))?,
+                        true,
+                    ));
+                }
+                // Different qualifiers - OK, no conflict
+                _ => {}
+            }
+        }
     }
-}
 
+    // No conflicts found
+    Ok((left, right, false))
+}
 /// Add additional "synthetic" group by expressions based on functional
 /// dependencies.
 ///
diff --git a/datafusion/optimizer/tests/optimizer_integration.rs 
b/datafusion/optimizer/tests/optimizer_integration.rs
index c0f48b8ebf..31ec026c1c 100644
--- a/datafusion/optimizer/tests/optimizer_integration.rs
+++ b/datafusion/optimizer/tests/optimizer_integration.rs
@@ -270,12 +270,14 @@ fn intersect() -> Result<()> {
     assert_snapshot!(
     format!("{plan}"),
     @r#"
-LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8
-  Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]
-    LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = 
test.col_utf8
-      Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]
+LeftSemi Join: left.col_int32 = test.col_int32, left.col_utf8 = test.col_utf8
+  Aggregate: groupBy=[[left.col_int32, left.col_utf8]], aggr=[[]]
+    LeftSemi Join: left.col_int32 = right.col_int32, left.col_utf8 = 
right.col_utf8
+      Aggregate: groupBy=[[left.col_int32, left.col_utf8]], aggr=[[]]
+        SubqueryAlias: left
+          TableScan: test projection=[col_int32, col_utf8]
+      SubqueryAlias: right
         TableScan: test projection=[col_int32, col_utf8]
-      TableScan: test projection=[col_int32, col_utf8]
   TableScan: test projection=[col_int32, col_utf8]
 "#
     );
diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/set_rel.rs 
b/datafusion/substrait/src/logical_plan/consumer/rel/set_rel.rs
index 6688a80f52..36bf8dbae4 100644
--- a/datafusion/substrait/src/logical_plan/consumer/rel/set_rel.rs
+++ b/datafusion/substrait/src/logical_plan/consumer/rel/set_rel.rs
@@ -81,7 +81,7 @@ async fn intersect_rels(
             rel,
             consumer.consume_rel(input).await?,
             is_all,
-        )?
+        )?;
     }
 
     Ok(rel)
@@ -95,7 +95,8 @@ async fn except_rels(
     let mut rel = consumer.consume_rel(&rels[0]).await?;
 
     for input in &rels[1..] {
-        rel = LogicalPlanBuilder::except(rel, 
consumer.consume_rel(input).await?, is_all)?
+        rel =
+            LogicalPlanBuilder::except(rel, 
consumer.consume_rel(input).await?, is_all)?;
     }
 
     Ok(rel)
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index c79a501ec8..34cb05fbf7 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -1162,6 +1162,96 @@ async fn simple_intersect_table_reuse() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn self_referential_intersect() -> Result<()> {
+    // Test INTERSECT with the same table on both sides
+    // This previously failed with "Schema contains duplicate qualified field 
name"
+    // The fix ensures requalify_sides_if_needed is called in 
intersect_or_except
+    // After roundtrip through Substrait, SubqueryAlias is lost and 
requalification
+    // produces "left" and "right" aliases
+    // Note: INTERSECT (without ALL) includes DISTINCT, but the outer Aggregate
+    // is optimized away, resulting in just the **LeftSemi** join
+    // (LeftSemi returns rows from left that exist in right)
+    assert_expected_plan(
+        "SELECT a FROM data WHERE a > 0 INTERSECT SELECT a FROM data WHERE a < 
5",
+        "LeftSemi Join: left.a = right.a\
+        \n  SubqueryAlias: left\
+        \n    Aggregate: groupBy=[[data.a]], aggr=[[]]\
+        \n      Filter: data.a > Int64(0)\
+        \n        TableScan: data projection=[a], partial_filters=[data.a > 
Int64(0)]\
+        \n  SubqueryAlias: right\
+        \n    Filter: data.a < Int64(5)\
+        \n      TableScan: data projection=[a], partial_filters=[data.a < 
Int64(5)]",
+        true,
+    )
+    .await
+}
+
+#[tokio::test]
+async fn self_referential_except() -> Result<()> {
+    // Test EXCEPT with the same table on both sides
+    // This previously failed with "Schema contains duplicate qualified field 
name"
+    // The fix ensures requalify_sides_if_needed is called in 
intersect_or_except
+    // After roundtrip through Substrait, SubqueryAlias is lost and 
requalification
+    // produces "left" and "right" aliases
+    // Note: EXCEPT (without ALL) includes DISTINCT, but the outer Aggregate
+    // is optimized away, resulting in just the **LeftAnti** join
+    // (LeftAnti returns rows from left that don't exist in right)
+    assert_expected_plan(
+        "SELECT a FROM data WHERE a > 0 EXCEPT SELECT a FROM data WHERE a < 5",
+        "LeftAnti Join: left.a = right.a\
+        \n  SubqueryAlias: left\
+        \n    Aggregate: groupBy=[[data.a]], aggr=[[]]\
+        \n      Filter: data.a > Int64(0)\
+        \n        TableScan: data projection=[a], partial_filters=[data.a > 
Int64(0)]\
+        \n  SubqueryAlias: right\
+        \n    Filter: data.a < Int64(5)\
+        \n      TableScan: data projection=[a], partial_filters=[data.a < 
Int64(5)]",
+        true,
+    )
+    .await
+}
+
+#[tokio::test]
+async fn self_referential_intersect_all() -> Result<()> {
+    // Test INTERSECT ALL with the same table on both sides
+    // INTERSECT ALL preserves duplicates and does not include DISTINCT
+    // Uses **LeftSemi** join (returns rows from left that exist in right)
+    // The requalification ensures no duplicate field name errors
+    assert_expected_plan(
+        "SELECT a FROM data WHERE a > 0 INTERSECT ALL SELECT a FROM data WHERE 
a < 5",
+        "LeftSemi Join: left.a = right.a\
+        \n  SubqueryAlias: left\
+        \n    Filter: data.a > Int64(0)\
+        \n      TableScan: data projection=[a], partial_filters=[data.a > 
Int64(0)]\
+        \n  SubqueryAlias: right\
+        \n    Filter: data.a < Int64(5)\
+        \n      TableScan: data projection=[a], partial_filters=[data.a < 
Int64(5)]",
+        true,
+    )
+    .await
+}
+
+#[tokio::test]
+async fn self_referential_except_all() -> Result<()> {
+    // Test EXCEPT ALL with the same table on both sides
+    // EXCEPT ALL preserves duplicates and does not include DISTINCT
+    // Uses **LeftAnti** join (returns rows from left that don't exist in 
right)
+    // The requalification ensures no duplicate field name errors
+    assert_expected_plan(
+        "SELECT a FROM data WHERE a > 0 EXCEPT ALL SELECT a FROM data WHERE a 
< 5",
+        "LeftAnti Join: left.a = right.a\
+        \n  SubqueryAlias: left\
+        \n    Filter: data.a > Int64(0)\
+        \n      TableScan: data projection=[a], partial_filters=[data.a > 
Int64(0)]\
+        \n  SubqueryAlias: right\
+        \n    Filter: data.a < Int64(5)\
+        \n      TableScan: data projection=[a], partial_filters=[data.a < 
Int64(5)]",
+        true,
+    )
+    .await
+}
+
 #[tokio::test]
 async fn simple_window_function() -> Result<()> {
     roundtrip("SELECT RANK() OVER (PARTITION BY a ORDER BY b), d, sum(b) OVER 
(PARTITION BY a) FROM data;").await


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to