This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new eace7d370213 [SPARK-45592][SPARK-45282][SQL][3.4] Correctness issue in
AQE with InMemoryTableScanExec
eace7d370213 is described below
commit eace7d370213f8498107cb14cd85f854a4d4d1df
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Sun Nov 12 13:54:01 2023 -0800
[SPARK-45592][SPARK-45282][SQL][3.4] Correctness issue in AQE with
InMemoryTableScanExec
### What changes were proposed in this pull request?
This backports https://github.com/apache/spark/pull/43435 SPARK-45592 to
the 3.4 branch. This is because it was already reported there as SPARK-45282
but it required enabling some extra configuration to hit the bug.
### Why are the changes needed?
Fix correctness issue.
### Does this PR introduce _any_ user-facing change?
Yes, fixing correctness issue.
### How was this patch tested?
New tests based on the reproduction example in SPARK-45282
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43729 from eejbyfeldt/SPARK-45282.
Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/catalyst/plans/physical/partitioning.scala | 49 +++
.../spark/sql/catalyst/DistributionSuite.scala | 124 ++++---
.../spark/sql/catalyst/ShuffleSpecSuite.scala | 401 ++++++++++++---------
.../execution/adaptive/AQEShuffleReadExec.scala | 11 +-
.../scala/org/apache/spark/sql/DatasetSuite.scala | 29 ++
.../WriteDistributionAndOrderingSuite.scala | 53 ++-
6 files changed, 401 insertions(+), 266 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index d2f9e9b5d5bf..1eefe65859bd 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -306,6 +306,35 @@ case class HashPartitioning(expressions: Seq[Expression],
numPartitions: Int)
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions
= newChildren)
+
+}
+
+case class CoalescedBoundary(startReducerIndex: Int, endReducerIndex: Int)
+
+/**
+ * Represents a partitioning where partitions have been coalesced from a
HashPartitioning into a
+ * fewer number of partitions.
+ */
+case class CoalescedHashPartitioning(from: HashPartitioning, partitions:
Seq[CoalescedBoundary])
+ extends Expression with Partitioning with Unevaluable {
+
+ override def children: Seq[Expression] = from.expressions
+ override def nullable: Boolean = from.nullable
+ override def dataType: DataType = from.dataType
+
+ override def satisfies0(required: Distribution): Boolean =
from.satisfies0(required)
+
+ override def createShuffleSpec(distribution: ClusteredDistribution):
ShuffleSpec =
+ CoalescedHashShuffleSpec(from.createShuffleSpec(distribution), partitions)
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): CoalescedHashPartitioning =
+ copy(from = from.copy(expressions = newChildren))
+
+ override val numPartitions: Int = partitions.length
+
+ override def toString: String = from.toString
+ override def sql: String = from.sql
}
/**
@@ -661,6 +690,26 @@ case class HashShuffleSpec(
override def numPartitions: Int = partitioning.numPartitions
}
+case class CoalescedHashShuffleSpec(
+ from: ShuffleSpec,
+ partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {
+
+ override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+ case SinglePartitionShuffleSpec =>
+ numPartitions == 1
+ case CoalescedHashShuffleSpec(otherParent, otherPartitions) =>
+ partitions == otherPartitions && from.isCompatibleWith(otherParent)
+ case ShuffleSpecCollection(specs) =>
+ specs.exists(isCompatibleWith)
+ case _ =>
+ false
+ }
+
+ override def canCreatePartitioning: Boolean = false
+
+ override def numPartitions: Int = partitions.length
+}
+
case class KeyGroupedShuffleSpec(
partitioning: KeyGroupedPartitioning,
distribution: ClusteredDistribution) extends ShuffleSpec {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
index a924a9ed02e5..7cb4d5f12325 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
import org.apache.spark.SparkFunSuite
/* Implicit conversions */
import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Literal, Murmur3Hash, Pmod}
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal,
Murmur3Hash, Pmod}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.types.IntegerType
@@ -146,63 +146,75 @@ class DistributionSuite extends SparkFunSuite {
false)
}
- test("HashPartitioning is the output partitioning") {
- // HashPartitioning can satisfy ClusteredDistribution iff its hash
expressions are a subset of
- // the required clustering expressions.
- checkSatisfied(
- HashPartitioning(Seq($"a", $"b", $"c"), 10),
- ClusteredDistribution(Seq($"a", $"b", $"c")),
- true)
-
- checkSatisfied(
- HashPartitioning(Seq($"b", $"c"), 10),
- ClusteredDistribution(Seq($"a", $"b", $"c")),
- true)
-
- checkSatisfied(
- HashPartitioning(Seq($"a", $"b", $"c"), 10),
- ClusteredDistribution(Seq($"b", $"c")),
- false)
-
- checkSatisfied(
- HashPartitioning(Seq($"a", $"b", $"c"), 10),
- ClusteredDistribution(Seq($"d", $"e")),
- false)
-
- // When ClusteredDistribution.requireAllClusterKeys is set to true,
- // HashPartitioning can only satisfy ClusteredDistribution iff its hash
expressions are
- // exactly same as the required clustering expressions.
- checkSatisfied(
- HashPartitioning(Seq($"a", $"b", $"c"), 10),
- ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys =
true),
- true)
-
- checkSatisfied(
- HashPartitioning(Seq($"b", $"c"), 10),
- ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys =
true),
- false)
-
- checkSatisfied(
- HashPartitioning(Seq($"b", $"a", $"c"), 10),
- ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys =
true),
- false)
-
- // HashPartitioning cannot satisfy OrderedDistribution
- checkSatisfied(
- HashPartitioning(Seq($"a", $"b", $"c"), 10),
- OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
- false)
+ private def testHashPartitioningLike(
+ partitioningName: String,
+ create: (Seq[Expression], Int) => Partitioning): Unit = {
+
+ test(s"$partitioningName is the output partitioning") {
+ // HashPartitioning can satisfy ClusteredDistribution iff its hash
expressions are a subset of
+ // the required clustering expressions.
+ checkSatisfied(
+ create(Seq($"a", $"b", $"c"), 10),
+ ClusteredDistribution(Seq($"a", $"b", $"c")),
+ true)
+
+ checkSatisfied(
+ create(Seq($"b", $"c"), 10),
+ ClusteredDistribution(Seq($"a", $"b", $"c")),
+ true)
+
+ checkSatisfied(
+ create(Seq($"a", $"b", $"c"), 10),
+ ClusteredDistribution(Seq($"b", $"c")),
+ false)
+
+ checkSatisfied(
+ create(Seq($"a", $"b", $"c"), 10),
+ ClusteredDistribution(Seq($"d", $"e")),
+ false)
+
+ // When ClusteredDistribution.requireAllClusterKeys is set to true,
+ // HashPartitioning can only satisfy ClusteredDistribution iff its hash
expressions are
+ // exactly same as the required clustering expressions.
+ checkSatisfied(
+ create(Seq($"a", $"b", $"c"), 10),
+ ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys =
true),
+ true)
+
+ checkSatisfied(
+ create(Seq($"b", $"c"), 10),
+ ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys =
true),
+ false)
+
+ checkSatisfied(
+ create(Seq($"b", $"a", $"c"), 10),
+ ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys =
true),
+ false)
+
+ // HashPartitioning cannot satisfy OrderedDistribution
+ checkSatisfied(
+ create(Seq($"a", $"b", $"c"), 10),
+ OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
+ false)
+
+ checkSatisfied(
+ create(Seq($"a", $"b", $"c"), 1),
+ OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
+ false) // TODO: this can be relaxed.
+
+ checkSatisfied(
+ create(Seq($"b", $"c"), 10),
+ OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
+ false)
+ }
+ }
- checkSatisfied(
- HashPartitioning(Seq($"a", $"b", $"c"), 1),
- OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
- false) // TODO: this can be relaxed.
+ testHashPartitioningLike("HashPartitioning",
+ (expressions, numPartitions) => HashPartitioning(expressions,
numPartitions))
- checkSatisfied(
- HashPartitioning(Seq($"b", $"c"), 10),
- OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
- false)
- }
+ testHashPartitioningLike("CoalescedHashPartitioning", (expressions,
numPartitions) =>
+ CoalescedHashPartitioning(
+ HashPartitioning(expressions, numPartitions), Seq(CoalescedBoundary(0,
numPartitions))))
test("RangePartitioning is the output partitioning") {
// RangePartitioning can satisfy OrderedDistribution iff its ordering is a
prefix
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
index 51e768873226..6b069d1c9736 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
@@ -62,211 +62,254 @@ class ShuffleSpecSuite extends SparkFunSuite with
SQLHelper {
}
}
- test("compatibility: HashShuffleSpec on both sides") {
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- expected = true
- )
-
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
ClusteredDistribution(Seq($"a", $"b"))),
- expected = true
- )
+ private def testHashShuffleSpecLike(
+ shuffleSpecName: String,
+ create: (HashPartitioning, ClusteredDistribution) => ShuffleSpec): Unit
= {
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"b"), 10),
ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"d"), 10),
ClusteredDistribution(Seq($"c", $"d"))),
- expected = true
- )
+ test(s"compatibility: $shuffleSpecName on both sides") {
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"a", $"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ expected = true
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"c", $"c", $"d"), 10),
- ClusteredDistribution(Seq($"c", $"d"))),
- expected = true
- )
+ checkCompatible(
+ create(HashPartitioning(Seq($"a"), 10),
ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"a"), 10),
ClusteredDistribution(Seq($"a", $"b"))),
+ expected = true
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"a", $"d"), 10),
- ClusteredDistribution(Seq($"a", $"c", $"d"))),
- expected = true
- )
+ checkCompatible(
+ create(HashPartitioning(Seq($"b"), 10),
ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"d"), 10),
ClusteredDistribution(Seq($"c", $"d"))),
+ expected = true
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"a"), 10),
- ClusteredDistribution(Seq($"a", $"b", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"a", $"c", $"a"), 10),
- ClusteredDistribution(Seq($"a", $"c", $"c"))),
- expected = true
- )
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"a", $"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"c", $"c", $"d"), 10),
+ ClusteredDistribution(Seq($"c", $"d"))),
+ expected = true
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"a"), 10),
- ClusteredDistribution(Seq($"a", $"b", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"a", $"c", $"a"), 10),
- ClusteredDistribution(Seq($"a", $"c", $"d"))),
- expected = true
- )
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b", $"b"))),
+ create(HashPartitioning(Seq($"a", $"d"), 10),
+ ClusteredDistribution(Seq($"a", $"c", $"d"))),
+ expected = true
+ )
- // negative cases
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"c"), 5),
- ClusteredDistribution(Seq($"c", $"d"))),
- expected = false
- )
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"b", $"a"), 10),
+ ClusteredDistribution(Seq($"a", $"b", $"b"))),
+ create(HashPartitioning(Seq($"a", $"c", $"a"), 10),
+ ClusteredDistribution(Seq($"a", $"c", $"c"))),
+ expected = true
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- expected = false
- )
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"b", $"a"), 10),
+ ClusteredDistribution(Seq($"a", $"b", $"b"))),
+ create(HashPartitioning(Seq($"a", $"c", $"a"), 10),
+ ClusteredDistribution(Seq($"a", $"c", $"d"))),
+ expected = true
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- expected = false
- )
+ // negative cases
+ checkCompatible(
+ create(HashPartitioning(Seq($"a"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"c"), 5),
+ ClusteredDistribution(Seq($"c", $"d"))),
+ expected = false
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"d"), 10),
- ClusteredDistribution(Seq($"c", $"d"))),
- expected = false
- )
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ expected = false
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"d"), 10),
- ClusteredDistribution(Seq($"c", $"d"))),
- expected = false
- )
+ checkCompatible(
+ create(HashPartitioning(Seq($"a"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ expected = false
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"a"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- expected = false
- )
+ checkCompatible(
+ create(HashPartitioning(Seq($"a"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"d"), 10),
+ ClusteredDistribution(Seq($"c", $"d"))),
+ expected = false
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"a"), 10),
- ClusteredDistribution(Seq($"a", $"b", $"b"))),
- expected = false
- )
- }
+ checkCompatible(
+ create(HashPartitioning(Seq($"a"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"d"), 10),
+ ClusteredDistribution(Seq($"c", $"d"))),
+ expected = false
+ )
- test("compatibility: Only one side is HashShuffleSpec") {
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- SinglePartitionShuffleSpec,
- expected = false
- )
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"a", $"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"a", $"b", $"a"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ expected = false
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 1),
- ClusteredDistribution(Seq($"a", $"b"))),
- SinglePartitionShuffleSpec,
- expected = true
- )
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"a", $"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b", $"b"))),
+ create(HashPartitioning(Seq($"a", $"b", $"a"), 10),
+ ClusteredDistribution(Seq($"a", $"b", $"b"))),
+ expected = false
+ )
+ }
- checkCompatible(
- SinglePartitionShuffleSpec,
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 1),
- ClusteredDistribution(Seq($"a", $"b"))),
- expected = true
- )
+ test(s"compatibility: Only one side is $shuffleSpecName") {
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ SinglePartitionShuffleSpec,
+ expected = false
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- RangeShuffleSpec(10, ClusteredDistribution(Seq($"a", $"b"))),
- expected = false
- )
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"b"), 1),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ SinglePartitionShuffleSpec,
+ expected = true
+ )
- checkCompatible(
- RangeShuffleSpec(10, ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- expected = false
- )
+ checkCompatible(
+ SinglePartitionShuffleSpec,
+ create(HashPartitioning(Seq($"a", $"b"), 1),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ expected = true
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- ShuffleSpecCollection(Seq(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))))),
- expected = true
- )
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ RangeShuffleSpec(10, ClusteredDistribution(Seq($"a", $"b"))),
+ expected = false
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- ShuffleSpecCollection(Seq(
- HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
+ checkCompatible(
+ RangeShuffleSpec(10, ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"a", $"b"), 10),
ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))))),
- expected = true
- )
+ expected = false
+ )
- checkCompatible(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))),
- ShuffleSpecCollection(Seq(
- HashShuffleSpec(HashPartitioning(Seq($"a"), 10),
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"b"), 10),
ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"c"), 10),
- ClusteredDistribution(Seq($"a", $"b", $"c"))))),
- expected = false
- )
+ ShuffleSpecCollection(Seq(
+ create(HashPartitioning(Seq($"a", $"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))))),
+ expected = true
+ )
- checkCompatible(
- ShuffleSpecCollection(Seq(
- HashShuffleSpec(HashPartitioning(Seq($"b"), 10),
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"b"), 10),
ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))))),
- ShuffleSpecCollection(Seq(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"c"), 10),
- ClusteredDistribution(Seq($"a", $"b", $"c"))),
- HashShuffleSpec(HashPartitioning(Seq($"d"), 10),
- ClusteredDistribution(Seq($"c", $"d"))))),
- expected = true
- )
+ ShuffleSpecCollection(Seq(
+ create(HashPartitioning(Seq($"a"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"a", $"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))))),
+ expected = true
+ )
- checkCompatible(
- ShuffleSpecCollection(Seq(
- HashShuffleSpec(HashPartitioning(Seq($"b"), 10),
+ checkCompatible(
+ create(HashPartitioning(Seq($"a", $"b"), 10),
ClusteredDistribution(Seq($"a", $"b"))),
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
- ClusteredDistribution(Seq($"a", $"b"))))),
- ShuffleSpecCollection(Seq(
- HashShuffleSpec(HashPartitioning(Seq($"a", $"b", $"c"), 10),
- ClusteredDistribution(Seq($"a", $"b", $"c"))),
- HashShuffleSpec(HashPartitioning(Seq($"c"), 10),
- ClusteredDistribution(Seq($"c", $"d"))))),
- expected = false
- )
+ ShuffleSpecCollection(Seq(
+ create(HashPartitioning(Seq($"a"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"a", $"b", $"c"), 10),
+ ClusteredDistribution(Seq($"a", $"b", $"c"))))),
+ expected = false
+ )
+
+ checkCompatible(
+ ShuffleSpecCollection(Seq(
+ create(HashPartitioning(Seq($"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"a", $"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))))),
+ ShuffleSpecCollection(Seq(
+ create(HashPartitioning(Seq($"a", $"b", $"c"), 10),
+ ClusteredDistribution(Seq($"a", $"b", $"c"))),
+ create(HashPartitioning(Seq($"d"), 10),
+ ClusteredDistribution(Seq($"c", $"d"))))),
+ expected = true
+ )
+
+ checkCompatible(
+ ShuffleSpecCollection(Seq(
+ create(HashPartitioning(Seq($"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))),
+ create(HashPartitioning(Seq($"a", $"b"), 10),
+ ClusteredDistribution(Seq($"a", $"b"))))),
+ ShuffleSpecCollection(Seq(
+ create(HashPartitioning(Seq($"a", $"b", $"c"), 10),
+ ClusteredDistribution(Seq($"a", $"b", $"c"))),
+ create(HashPartitioning(Seq($"c"), 10),
+ ClusteredDistribution(Seq($"c", $"d"))))),
+ expected = false
+ )
+ }
+ }
+
+ testHashShuffleSpecLike("HashShuffleSpec",
+ (partitioning, distribution) => HashShuffleSpec(partitioning,
distribution))
+ testHashShuffleSpecLike("CoalescedHashShuffleSpec",
+ (partitioning, distribution) => {
+ val partitions = if (partitioning.numPartitions == 1) {
+ Seq(CoalescedBoundary(0, 1))
+ } else {
+ Seq(CoalescedBoundary(0, 1), CoalescedBoundary(0,
partitioning.numPartitions))
+ }
+ CoalescedHashShuffleSpec(HashShuffleSpec(partitioning, distribution),
partitions)
+ })
+
+ test("compatibility: CoalescedHashShuffleSpec other specs") {
+ val hashShuffleSpec = HashShuffleSpec(
+ HashPartitioning(Seq($"a", $"b"), 10), ClusteredDistribution(Seq($"a",
$"b")))
+ checkCompatible(
+ hashShuffleSpec,
+ CoalescedHashShuffleSpec(hashShuffleSpec, Seq(CoalescedBoundary(0,
10))),
+ expected = false
+ )
+
+ checkCompatible(
+ CoalescedHashShuffleSpec(hashShuffleSpec,
+ Seq(CoalescedBoundary(0, 5), CoalescedBoundary(5, 10))),
+ CoalescedHashShuffleSpec(hashShuffleSpec,
+ Seq(CoalescedBoundary(0, 5), CoalescedBoundary(5, 10))),
+ expected = true
+ )
+
+ checkCompatible(
+ CoalescedHashShuffleSpec(hashShuffleSpec,
+ Seq(CoalescedBoundary(0, 4), CoalescedBoundary(4, 10))),
+ CoalescedHashShuffleSpec(hashShuffleSpec,
+ Seq(CoalescedBoundary(0, 5), CoalescedBoundary(5, 10))),
+ expected = false
+ )
}
test("compatibility: other specs") {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
index 46ec91dcc0ab..6b39ac70a62e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
@@ -19,10 +19,11 @@ package org.apache.spark.sql.execution.adaptive
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition,
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{CoalescedBoundary,
CoalescedHashPartitioning, HashPartitioning, Partitioning, RangePartitioning,
RoundRobinPartitioning, SinglePartition, UnknownPartitioning}
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec,
ShuffleExchangeLike}
@@ -75,7 +76,13 @@ case class AQEShuffleReadExec private(
// partitions is changed.
child.outputPartitioning match {
case h: HashPartitioning =>
- CurrentOrigin.withOrigin(h.origin)(h.copy(numPartitions =
partitionSpecs.length))
+ val partitions = partitionSpecs.map {
+ case CoalescedPartitionSpec(start, end, _) =>
CoalescedBoundary(start, end)
+ // Can not happend due to isCoalescedRead
+ case unexpected =>
+ throw SparkException.internalError(s"Unexpected
ShufflePartitionSpec: $unexpected")
+ }
+ CurrentOrigin.withOrigin(h.origin)(CoalescedHashPartitioning(h,
partitions))
case r: RangePartitioning =>
CurrentOrigin.withOrigin(r.origin)(r.copy(numPartitions =
partitionSpecs.length))
// This can only happen for `REBALANCE_PARTITIONS_BY_NONE`, which uses
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index f8f6845afcaa..6a1aa25c6e21 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.sql.{Date, Timestamp}
+import java.util.UUID
import scala.util.Random
@@ -2461,6 +2462,34 @@ class DatasetSuite extends QueryTest
)
assert(result == expected)
}
+
+ test("SPARK-45282: Coaleasced shuffle read is not compatible with hash
partitioning") {
+
+ withSQLConf(
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "33554432",
+ SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST.key -> "false",
+ SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
+ val data = (1 to 1000000).toDS().map(i =>
UUID.randomUUID().toString).persist()
+
+ val left = data.map(k => (k, 1))
+ val right = data.map(k => (k, k))
+
+ val left1 = left
+ .toDF("key", "value1")
+ .repartition(col("key"))
+ .persist()
+ left1.count()
+
+ val right1 = right
+ .toDF("key", "value2")
+ .repartition(col("key"))
+ .persist()
+
+ val join = left1.join(right1, "key")
+
+ assert(join.count() == 1000000)
+ }
+ }
}
class DatasetLargeResultCollectingSuite extends QueryTest
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
index d4a6d2213652..52af05d0eeed 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.{catalyst, AnalysisException,
DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression,
Cast, Literal}
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.catalyst.plans.physical
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
RangePartitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{CoalescedBoundary,
CoalescedHashPartitioning, HashPartitioning, RangePartitioning,
UnknownPartitioning}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.functions._
import org.apache.spark.sql.connector.distributions.{Distribution,
Distributions}
@@ -257,11 +257,8 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
)
)
val writePartitioningExprs = Seq(attr("data"), attr("id"))
- val writePartitioning = if (!coalesce) {
- clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions)
- } else {
- clusteredWritePartitioning(writePartitioningExprs, Some(1))
- }
+ val writePartitioning = clusteredWritePartitioning(
+ writePartitioningExprs, targetNumPartitions, coalesce)
checkWriteRequirements(
tableDistribution,
@@ -366,11 +363,8 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
)
)
val writePartitioningExprs = Seq(attr("data"))
- val writePartitioning = if (!coalesce) {
- clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions)
- } else {
- clusteredWritePartitioning(writePartitioningExprs, Some(1))
- }
+ val writePartitioning = clusteredWritePartitioning(
+ writePartitioningExprs, targetNumPartitions, coalesce)
checkWriteRequirements(
tableDistribution,
@@ -848,11 +842,8 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
)
)
val writePartitioningExprs = Seq(attr("data"))
- val writePartitioning = if (!coalesce) {
- clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions)
- } else {
- clusteredWritePartitioning(writePartitioningExprs, Some(1))
- }
+ val writePartitioning = clusteredWritePartitioning(
+ writePartitioningExprs, targetNumPartitions, coalesce)
checkWriteRequirements(
tableDistribution,
@@ -932,11 +923,8 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
)
)
val writePartitioningExprs = Seq(attr("data"))
- val writePartitioning = if (!coalesce) {
- clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions)
- } else {
- clusteredWritePartitioning(writePartitioningExprs, Some(1))
- }
+ val writePartitioning = clusteredWritePartitioning(
+ writePartitioningExprs, targetNumPartitions, coalesce)
checkWriteRequirements(
tableDistribution,
@@ -1119,11 +1107,8 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
)
val writePartitioningExprs = Seq(truncateExpr)
- val writePartitioning = if (!coalesce) {
- clusteredWritePartitioning(writePartitioningExprs, targetNumPartitions)
- } else {
- clusteredWritePartitioning(writePartitioningExprs, Some(1))
- }
+ val writePartitioning = clusteredWritePartitioning(
+ writePartitioningExprs, targetNumPartitions, coalesce)
checkWriteRequirements(
tableDistribution,
@@ -1365,6 +1350,9 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
case p: physical.HashPartitioning =>
val resolvedExprs = p.expressions.map(resolveAttrs(_, plan))
p.copy(expressions = resolvedExprs)
+ case c: physical.CoalescedHashPartitioning =>
+ val resolvedExprs = c.from.expressions.map(resolveAttrs(_, plan))
+ c.copy(from = c.from.copy(expressions = resolvedExprs))
case _: UnknownPartitioning =>
// don't check partitioning if no particular one is expected
actualPartitioning
@@ -1423,8 +1411,15 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
private def clusteredWritePartitioning(
writePartitioningExprs: Seq[catalyst.expressions.Expression],
- targetNumPartitions: Option[Int]): physical.Partitioning = {
- HashPartitioning(writePartitioningExprs,
- targetNumPartitions.getOrElse(conf.numShufflePartitions))
+ targetNumPartitions: Option[Int],
+ coalesce: Boolean): physical.Partitioning = {
+ val partitioning = HashPartitioning(writePartitioningExprs,
+ targetNumPartitions.getOrElse(conf.numShufflePartitions))
+ if (coalesce) {
+ CoalescedHashPartitioning(
+ partitioning, Seq(CoalescedBoundary(0, partitioning.numPartitions)))
+ } else {
+ partitioning
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]