Naveen Kumar Kumar Puppala created SPARK-55848:
--------------------------------------------------
Summary: [SQL] SPJ partial clustering produces incorrect results
for post-join dropDuplicates and Window dedup operations
Key: SPARK-55848
URL: https://issues.apache.org/jira/browse/SPARK-55848
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 4.0.1, 4.0.0, 3.5.0, 3.4.0
Reporter: Naveen Kumar Kumar Puppala
When using Storage-Partitioned Join (SPJ) with
\{{spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true}},
both \{{dropDuplicates()}} and Window-based dedup (\{{row_number() OVER
PARTITION BY}}) produce incorrect results. Duplicate rows that should have been
removed survive in the output.
*Root cause:* Partial clustering (SPARK-42038) splits a partition with many
files across multiple tasks. After the join, each split task holds a subset of
rows for the same partition key. The join's output inherits
\{{KeyGroupedPartitioning}}, which \{{EnsureRequirements}} considers as
satisfying \{{ClusteredDistribution}}. No Exchange (shuffle) is inserted before
downstream dedup operators, so each split task independently deduplicates its
subset — producing inflated results.
SPARK-53074 fixed the pre-join case (operators between scan and join) but the
post-join case remains unfixed.
*Proposed fix:* Add an \{{isPartiallyClustered}} flag to
\{{KeyGroupedPartitioning}}. Override \{{satisfies0(ClusteredDistribution)}} to
return false when partially clustered, causing \{{EnsureRequirements}} to
insert an Exchange. Plain SPJ joins without dedup remain unaffected.
GitHub issue: https://github.com/apache/spark/issues/54378
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]