This is an automated email from the ASF dual-hosted git repository.
ptoth pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new b6eed1855084 [SPARK-53074][SQL][3.5] Avoid partial clustering in SPJ
to meet a child's required distribution
b6eed1855084 is described below
commit b6eed1855084e1c3ce3c14acdb1219533456e4d2
Author: Chirag Singh <[email protected]>
AuthorDate: Tue Jan 13 11:33:36 2026 +0100
[SPARK-53074][SQL][3.5] Avoid partial clustering in SPJ to meet a child's
required distribution
### What changes were proposed in this pull request?
Currently, SPJ logic can apply partial clustering (when enabled) to either
side of an inner JOIN as long as the nodes between the scan and JOIN preserve
partitioning. This doesn't work if one of these nodes is using the scan's
key-grouped partitioning to satisfy its required distribution (for example, a
grouping agg or window function).
This PR avoids this issue by avoiding applying a partially clustered
distribution to a JOIN's child if any node in that child relies on the
KeyGroupedPartitioning to satisfy its required distribution (since it's not
safe to do so with a partially clustered distribution).
### Why are the changes needed?
Without this fix, using a partially-clustered distribution with SPJ may
cause correctness issues.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
See test changes.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53770 from peter-toth/SPARK-53074-3.5.
Lead-authored-by: Chirag Singh <[email protected]>
Co-authored-by: Peter Toth <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
---
.../execution/exchange/EnsureRequirements.scala | 31 +++++++++-
.../connector/KeyGroupedPartitioningSuite.scala | 67 ++++++++++++++++++++++
2 files changed, 96 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index ee0ea11816f9..4546a5b1d270 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -337,6 +337,26 @@ case class EnsureRequirements(
}
}
+ /**
+ * Whether partial clustering can be applied to a given child query plan.
This is true if the plan
+ * consists only of a sequence of unary nodes where each node does not use
the scan's key-grouped
+ * partitioning to satisfy its required distribution. Otherwise, partially
clustering could be
+ * applied to a key-grouped partitioning unrelated to this join.
+ */
+ private def canApplyPartialClusteredDistribution(plan: SparkPlan): Boolean =
{
+ !plan.exists {
+ // Unary nodes are safe as long as they don't have a required
distribution (for example, a
+ // project or filter). If they have a required distribution, then we
should assume that this
+ // plan can't be partially clustered (since the key-grouped partitioning
may be needed to
+ // satisfy this distribution unrelated to this JOIN).
+ case u if u.children.length == 1 =>
+ u.requiredChildDistribution.head != UnspecifiedDistribution
+ // Only allow a non-unary node if it's a leaf node - key-grouped
partitionings other binary
+ // nodes (like another JOIN) aren't safe to partially cluster.
+ case other => other.children.nonEmpty
+ }
+ }
+
/**
* Checks whether two children, `left` and `right`, of a join operator have
compatible
* `KeyGroupedPartitioning`, and can benefit from storage-partitioned join.
@@ -438,9 +458,16 @@ case class EnsureRequirements(
// whether partially clustered distribution can be applied. For
instance, the
// optimization cannot be applied to a left outer join, where the
left hand
// side is chosen as the side to replicate partitions according to
stats.
+ // Similarly, the partially clustered distribution cannot be applied
if the
+ // partially clustered side must use the scan's key-grouped
partitioning to
+ // satisfy some unrelated required distribution in its plan (for
example, for an aggregate
+ // or window function), as this will give incorrect results (for
example, duplicate
+ // row_number() values).
// Otherwise, query result could be incorrect.
- val canReplicateLeft = canReplicateLeftSide(joinType)
- val canReplicateRight = canReplicateRightSide(joinType)
+ val canReplicateLeft = canReplicateLeftSide(joinType) &&
+ canApplyPartialClusteredDistribution(right)
+ val canReplicateRight = canReplicateRightSide(joinType) &&
+ canApplyPartialClusteredDistribution(left)
if (!canReplicateLeft && !canReplicateRight) {
logInfo("Skipping partially clustered distribution as it cannot be
applied for " +
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index 71e030f535e9..6f40775ce242 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.connector
+import java.sql.Timestamp
import java.util.Collections
import org.apache.spark.sql.{DataFrame, Row}
@@ -955,6 +956,72 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
}
}
+ test("[SPARK-53074] partial clustering avoided to meet a non-JOIN required
distribution") {
+ val items_partitions = Array(identity("id"))
+ createTable(items, items_schema, items_partitions)
+ sql(s"INSERT INTO testcat.ns.$items VALUES " +
+ "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+ "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+ "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
+ "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
+
+ val purchases_partitions = Array(identity("item_id"))
+ createTable(purchases, purchases_schema, purchases_partitions)
+ sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
+ "(1, 45.0, cast('2020-01-01' as timestamp)), " +
+ "(1, 50.0, cast('2020-01-02' as timestamp)), " +
+ "(2, 15.0, cast('2020-01-02' as timestamp)), " +
+ "(2, 20.0, cast('2020-01-03' as timestamp)), " +
+ "(3, 20.0, cast('2020-02-01' as timestamp))")
+
+ for {
+ pushDownValues <- Seq(true, false)
+ enable <- Seq("true", "false")
+ } yield {
+ withSQLConf(
+ SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key ->
false.toString,
+ SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key ->
pushDownValues.toString,
+ SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
enable) {
+ // The left side uses a key-grouped partitioning to satisfy the WINDOW
function's
+ // required distribution. By default, the left side will be partially
clustered (since
+ // it's estimated to be larger), but this partial clustering won't be
applied because the
+ // left side needs to be key-grouped partitioned to satisfy the
WINDOW's required
+ // distribution.
+ // The left side needs to project additional fields to ensure it's
estimated to be
+ // larger than the right side.
+ val df = sql(
+ s"""
+ |WITH purchases_windowed AS (
+ | SELECT
+ | ROW_NUMBER() OVER (
+ | PARTITION BY item_id ORDER BY time DESC
+ | ) AS RN,
+ | item_id,
+ | price,
+ | STRUCT(item_id, price, time) AS purchases_struct
+ | FROM testcat.ns.$purchases
+ |)
+ |SELECT
+ | SUM(p.price),
+ | SUM(p.purchases_struct.item_id),
+ | SUM(p.purchases_struct.price),
+ | MAX(p.purchases_struct.time)
+ |FROM
+ | purchases_windowed p JOIN testcat.ns.$items i
+ | ON i.id = p.item_id
+ |WHERE p.RN = 1
+ |""".stripMargin)
+ checkAnswer(df, Seq(Row(140.0, 7, 140.0, Timestamp.valueOf("2020-02-01
00:00:00"))))
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ assert(shuffles.isEmpty, "should not contain any shuffle")
+ if (pushDownValues) {
+ val scans = collectScans(df.queryExecution.executedPlan)
+ assert(scans.forall(_.inputRDD.partitions.length === 3))
+ }
+ }
+ }
+ }
+
test("data source partitioning + dynamic partition filtering") {
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]