This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8a76ac421cba [SPARK-51262][SQL] Fix exceptAll after dropDuplicates
with subset
8a76ac421cba is described below
commit 8a76ac421cba4ff7ad8665f761a8cdbf197f2370
Author: Shrirang Mhalgi <[email protected]>
AuthorDate: Thu Jun 4 11:27:36 2026 +0800
[SPARK-51262][SQL] Fix exceptAll after dropDuplicates with subset
### What changes were proposed in this pull request?
Reorder `ReplaceDeduplicateWithAggregate` before `RewriteExceptAll` in the
"Replace Operators" optimizer batch.
### Why are the changes needed?
`dropDuplicates("id", "name").exceptAll(other)` throws
`INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND` at execution time. The root cause is that
`RewriteExceptAll` captures attribute references from `left.output` before
`ReplaceDeduplicateWithAggregate` has replaced the Deduplicate node with an
Aggregate(First(...)). The First() alias creates new exprIds that don't match
what `RewriteExceptAll` baked into its Generate node.
### Does this PR introduce _any_ user-facing change?
Yes. `exceptAll (and intersectAll)` now work correctly after
`dropDuplicates` with a column subset.
### How was this patch tested?
Added a test in `DataFrameSetOperationsSuite` verifying `exceptAll`,
`except`, and `intersectAll` after `dropDuplicates(subset)`.
### Was this patch authored or co-authored using generative AI tooling?
Yes.
Closes #55905 from
shrirangmhalgi/SPARK-51262-except-all-not-working-with-drop-duplicates.
Authored-by: Shrirang Mhalgi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 8 +++--
.../spark/sql/DataFrameSetOperationsSuite.scala | 37 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 2 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0cf03052cbdb..a1fe4cd8674b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -214,13 +214,17 @@ abstract class Optimizer(catalogManager: CatalogManager)
OptimizeSubqueries,
OptimizeOneRowRelationSubquery),
Batch("Replace Operators", fixedPoint,
+ // SPARK-51262: ReplaceDeduplicateWithAggregate must run before
RewriteExceptAll because
+ // it replaces Deduplicate with Aggregate(First(...)), creating new
attribute exprIds.
+ // If RewriteExceptAll runs first, its Generate node captures stale
exprIds that no
+ // longer exist after the Deduplicate-to-Aggregate rewrite.
+ ReplaceDeduplicateWithAggregate,
RewriteExceptAll,
RewriteIntersectAll,
ReplaceIntersectWithSemiJoin,
ReplaceExceptWithFilter,
ReplaceExceptWithAntiJoin,
- ReplaceDistinctWithAggregate,
- ReplaceDeduplicateWithAggregate),
+ ReplaceDistinctWithAggregate),
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions),
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
index e65942689bc0..d838ba4c234f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
@@ -1642,6 +1642,43 @@ class DataFrameSetOperationsSuite extends
SharedSparkSession with AdaptiveSparkP
}
}
}
+
+ test("SPARK-51262: exceptAll after dropDuplicates with subset should not
throw") {
+ // Data where dropDuplicates(subset) produces deterministic results - to
avoid test flakiness.
+ val df1 = spark.createDataFrame(Seq(
+ (1, "a", 100),
+ (2, "b", 200),
+ (3, "c", 300)
+ )).toDF("id", "name", "value")
+
+ val df2 = spark.createDataFrame(Seq(
+ (1, "a", 100)
+ )).toDF("id", "name", "value")
+
+ // dropDuplicates with subset - each (id, name) is already unique so
output is deterministic
+ val deduped = df1.dropDuplicates("id", "name")
+
+ // exceptAll should work without INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND
+ val result = deduped.exceptAll(df2)
+ assert(result.columns === Array("id", "name", "value"))
+ val rows = result.collect().sortBy(_.getInt(0))
+ assert(rows.length === 2)
+ assert(rows(0) === Row(2, "b", 200))
+ assert(rows(1) === Row(3, "c", 300))
+
+ // Also verify except (non-all) works and returns correct values
+ val result2 = deduped.except(df2)
+ val rows2 = result2.collect().sortBy(_.getInt(0))
+ assert(rows2.length === 2)
+ assert(rows2(0) === Row(2, "b", 200))
+ assert(rows2(1) === Row(3, "c", 300))
+
+ // intersectAll should also work and return the matching row
+ val result3 = deduped.intersectAll(df2)
+ val rows3 = result3.collect()
+ assert(rows3.length === 1)
+ assert(rows3.head === Row(1, "a", 100))
+ }
}
case class UnionClass1a(a: Int, b: Long, nested: UnionClass2)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]