This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new db649d974220 [SPARK-51262][SQL] Fix exceptAll after dropDuplicates 
with subset
db649d974220 is described below

commit db649d97422036d5a1397dfdbfb0ad83475d5ef1
Author: Shrirang Mhalgi <[email protected]>
AuthorDate: Thu Jun 4 11:27:36 2026 +0800

    [SPARK-51262][SQL] Fix exceptAll after dropDuplicates with subset
    
    ### What changes were proposed in this pull request?
    
    Reorder `ReplaceDeduplicateWithAggregate` before `RewriteExceptAll` in the 
"Replace Operators" optimizer batch.
    
    ### Why are the changes needed?
    
    `dropDuplicates("id", "name").exceptAll(other)` throws 
`INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND` at execution time. The root cause is that 
`RewriteExceptAll` captures attribute references from `left.output` before 
`ReplaceDeduplicateWithAggregate` has replaced the Deduplicate node with an 
Aggregate(First(...)). The First() alias creates new exprIds that don't match 
what `RewriteExceptAll` baked into its Generate node.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. `exceptAll (and intersectAll)` now work correctly after 
`dropDuplicates` with a column subset.
    
    ### How was this patch tested?
    
    Added a test in `DataFrameSetOperationsSuite` verifying `exceptAll`, 
`except`, and `intersectAll` after `dropDuplicates(subset)`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes.
    
    Closes #55905 from 
shrirangmhalgi/SPARK-51262-except-all-not-working-with-drop-duplicates.
    
    Authored-by: Shrirang Mhalgi <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 8a76ac421cba4ff7ad8665f761a8cdbf197f2370)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  8 +++--
 .../spark/sql/DataFrameSetOperationsSuite.scala    | 37 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 524940f1f17a..d6729a1fe989 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -214,13 +214,17 @@ abstract class Optimizer(catalogManager: CatalogManager)
       OptimizeSubqueries,
       OptimizeOneRowRelationSubquery),
     Batch("Replace Operators", fixedPoint,
+      // SPARK-51262: ReplaceDeduplicateWithAggregate must run before 
RewriteExceptAll because
+      // it replaces Deduplicate with Aggregate(First(...)), creating new 
attribute exprIds.
+      // If RewriteExceptAll runs first, its Generate node captures stale 
exprIds that no
+      // longer exist after the Deduplicate-to-Aggregate rewrite.
+      ReplaceDeduplicateWithAggregate,
       RewriteExceptAll,
       RewriteIntersectAll,
       ReplaceIntersectWithSemiJoin,
       ReplaceExceptWithFilter,
       ReplaceExceptWithAntiJoin,
-      ReplaceDistinctWithAggregate,
-      ReplaceDeduplicateWithAggregate),
+      ReplaceDistinctWithAggregate),
     Batch("Aggregate", fixedPoint,
       RemoveLiteralFromGroupExpressions,
       RemoveRepetitionFromGroupExpressions),
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
index e65942689bc0..d838ba4c234f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
@@ -1642,6 +1642,43 @@ class DataFrameSetOperationsSuite extends 
SharedSparkSession with AdaptiveSparkP
       }
     }
   }
+
+  test("SPARK-51262: exceptAll after dropDuplicates with subset should not 
throw") {
+    // Data where dropDuplicates(subset) produces deterministic results - to 
avoid test flakiness.
+    val df1 = spark.createDataFrame(Seq(
+      (1, "a", 100),
+      (2, "b", 200),
+      (3, "c", 300)
+    )).toDF("id", "name", "value")
+
+    val df2 = spark.createDataFrame(Seq(
+      (1, "a", 100)
+    )).toDF("id", "name", "value")
+
+    // dropDuplicates with subset - each (id, name) is already unique so 
output is deterministic
+    val deduped = df1.dropDuplicates("id", "name")
+
+    // exceptAll should work without INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND
+    val result = deduped.exceptAll(df2)
+    assert(result.columns === Array("id", "name", "value"))
+    val rows = result.collect().sortBy(_.getInt(0))
+    assert(rows.length === 2)
+    assert(rows(0) === Row(2, "b", 200))
+    assert(rows(1) === Row(3, "c", 300))
+
+    // Also verify except (non-all) works and returns correct values
+    val result2 = deduped.except(df2)
+    val rows2 = result2.collect().sortBy(_.getInt(0))
+    assert(rows2.length === 2)
+    assert(rows2(0) === Row(2, "b", 200))
+    assert(rows2(1) === Row(3, "c", 300))
+
+    // intersectAll should also work and return the matching row
+    val result3 = deduped.intersectAll(df2)
+    val rows3 = result3.collect()
+    assert(rows3.length === 1)
+    assert(rows3.head === Row(1, "a", 100))
+  }
 }
 
 case class UnionClass1a(a: Int, b: Long, nested: UnionClass2)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to