This is an automated email from the ASF dual-hosted git repository.
ptoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 14d659ea8133 [SPARK-55535][SQL][FOLLOW-UP] Fix `OrderedDistribution`
handling and minor improvements to `EnsureRequirements`
14d659ea8133 is described below
commit 14d659ea8133ba0bd23252d27c27095ee10e0472
Author: Peter Toth <[email protected]>
AuthorDate: Thu Mar 12 10:27:58 2026 +0100
[SPARK-55535][SQL][FOLLOW-UP] Fix `OrderedDistribution` handling and minor
improvements to `EnsureRequirements`
### What changes were proposed in this pull request?
This is a follow-up PR to https://github.com/apache/spark/pull/54330 to fix
`OrderedDistribution` handling in `EnsureRequirements` so as to avoid a
correctness bug. The PR contains minor improvements to `EnsureRequirements` and
configuration docs updates as well.
### Why are the changes needed?
To fix a correctness bug introduced with the refactor.
### Does this PR introduce _any_ user-facing change?
Yes, but the refactor (https://github.com/apache/spark/pull/54330) hasn't
been released.
### How was this patch tested?
Added new UT.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54727 from peter-toth/SPARK-55535-refactor-kgp-and-spj-follow-up.
Authored-by: Peter Toth <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
---
.../sql/catalyst/plans/physical/partitioning.scala | 35 ++++-------
.../org/apache/spark/sql/internal/SQLConf.scala | 10 ++--
.../datasources/v2/GroupPartitionsExec.scala | 10 ++--
.../execution/exchange/EnsureRequirements.scala | 70 +++++++++++++---------
.../connector/KeyGroupedPartitioningSuite.scala | 65 +++++++++++++++++---
.../exchange/EnsureRequirementsSuite.scala | 12 ++--
6 files changed, 125 insertions(+), 77 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 99ef23e54c74..28a9225b6ce2 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.catalyst.plans.physical
-import java.util.Objects
-
import scala.annotation.tailrec
import scala.collection.mutable
@@ -360,12 +358,11 @@ case class CoalescedHashPartitioning(from:
HashPartitioning, partitions: Seq[Coa
* preserved through `GroupPartitionsExec`. The sorted order is critical
for storage-partitioned
* join compatibility.
*
- * 2. '''In KeyGroupedShuffleSpec''': When used within
`KeyGroupedShuffleSpec`, the `partitionKeys`
- * may not be in sorted order. This occurs because `KeyGroupedShuffleSpec`
can project the
- * partition keys by join key positions. The `EnsureRequirements` rule
ensures that either the
- * unordered keys from both sides of a join match exactly, or it builds a
common ordered set of
- * keys and pushes them down to `GroupPartitionsExec` on both sides to
establish a compatible
- * ordering.
+ * 2. '''In KeyedShuffleSpec''': When used within `KeyedShuffleSpec`, the
`partitionKeys` may not be
+ * in sorted order. This occurs because `KeyedShuffleSpec` can project the
partition keys by join
+ * key positions. The `EnsureRequirements` rule ensures that either the
unordered keys from both
+ * sides of a join match exactly, or it builds a common ordered set of keys
and pushes them down
+ * to `GroupPartitionsExec` on both sides to establish a compatible
ordering.
*
* == Partition Keys ==
* - `partitionKeys`: The partition keys, one per partition. May contain
duplicates initially
@@ -427,7 +424,7 @@ case class CoalescedHashPartitioning(from:
HashPartitioning, partitions: Seq[Coa
* @param expressions Partition transform expressions (e.g., `years(col)`,
`bucket(10, col)`).
* @param partitionKeys Partition keys wrapped in InternalRowComparableWrapper
for efficient
* comparison and grouping. One per partition. When used
as outputPartitioning,
- * always in sorted order. When used in
KeyGroupedShuffleSpec, may be unsorted
+ * always in sorted order. When used in
`KeyedShuffleSpec`, may be unsorted
* after projection. May contain duplicates when
ungrouped.
* @param isGrouped Whether partition keys are unique (no duplicates).
Computed on first
* creation, then preserved through copy operations to avoid
recomputation.
@@ -509,7 +506,7 @@ case class KeyedPartitioning(
}
override def createShuffleSpec(distribution: ClusteredDistribution):
ShuffleSpec = {
- val result = KeyGroupedShuffleSpec(this, distribution)
+ val result = KeyedShuffleSpec(this, distribution)
if (SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) {
// If allowing join keys to be subset of clustering keys, we should
create a new
// `KeyedPartitioning` here that is grouped on the join keys instead,
and use that as
@@ -525,16 +522,6 @@ case class KeyedPartitioning(
result
}
}
-
- override def equals(that: Any): Boolean = that match {
- case k: KeyedPartitioning if this.expressions == k.expressions =>
- this.partitionKeys == k.partitionKeys
-
- case _ => false
- }
-
- override def hashCode(): Int =
- Objects.hash(expressions, partitionKeys)
}
object KeyedPartitioning {
@@ -954,7 +941,7 @@ case class CoalescedHashShuffleSpec(
* @param joinKeyPositions position of join keys among cluster keys.
* This is set if joining on a subset of cluster keys
is allowed.
*/
-case class KeyGroupedShuffleSpec(
+case class KeyedShuffleSpec(
partitioning: KeyedPartitioning,
distribution: ClusteredDistribution,
joinKeyPositions: Option[Seq[Int]] = None) extends ShuffleSpec {
@@ -992,7 +979,7 @@ case class KeyGroupedShuffleSpec(
// 3.3 each pair of partition expressions at the same index must share
compatible
// transform functions.
// 4. the partition values from both sides are following the same order.
- case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning,
otherDistribution, _) =>
+ case otherSpec @ KeyedShuffleSpec(otherPartitioning, otherDistribution, _)
=>
distribution.clustering.length == otherDistribution.clustering.length &&
numPartitions == other.numPartitions && areKeysCompatible(otherSpec) &&
partitioning.partitionKeys == otherPartitioning.partitionKeys
@@ -1003,7 +990,7 @@ case class KeyGroupedShuffleSpec(
// Whether the partition keys (i.e., partition expressions) are compatible
between this and the
// `other` spec.
- def areKeysCompatible(other: KeyGroupedShuffleSpec): Boolean = {
+ def areKeysCompatible(other: KeyedShuffleSpec): Boolean = {
val expressions = partitioning.expressions
val otherExpressions = other.partitioning.expressions
@@ -1047,7 +1034,7 @@ case class KeyGroupedShuffleSpec(
*
* @param other other key-grouped shuffle spec
*/
- def reducers(other: KeyGroupedShuffleSpec): Option[Seq[Option[Reducer[_,
_]]]] = {
+ def reducers(other: KeyedShuffleSpec): Option[Seq[Option[Reducer[_, _]]]] = {
val results =
partitioning.expressions.zip(other.partitioning.expressions).map {
case (e1: TransformExpression, e2: TransformExpression) =>
e1.reducers(e2)
case (_, _) => None
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c90cf6388684..8c9796b71689 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2077,11 +2077,11 @@ object SQLConf {
val V2_BUCKETING_PUSH_PART_VALUES_ENABLED =
buildConf("spark.sql.sources.v2.bucketing.pushPartValues.enabled")
.doc(s"Whether to pushdown common partition values when
${V2_BUCKETING_ENABLED.key} is " +
- "enabled. When turned on, if both sides of a join are of
KeyGroupedPartitioning and if " +
+ "enabled. When turned on, if both sides of a join are of
KeyedPartitioning and if " +
"they share compatible partition keys, even if they don't have the
exact same partition " +
"values, Spark will calculate a superset of partition values and
pushdown that info to " +
- "scan nodes, which will use empty partitions for the missing partition
values on either " +
- "side. This could help to eliminate unnecessary shuffles")
+ "group partition nodes, which will use empty partitions for the
missing partition values " +
+ "on either side. This could help to eliminate unnecessary shuffles")
.version("3.4.0")
.booleanConf
.createWithDefault(true)
@@ -2089,7 +2089,7 @@ object SQLConf {
val V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED =
buildConf("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled")
.doc("During a storage-partitioned join, whether to allow input
partitions to be " +
- "partially clustered, when both sides of the join are of
KeyGroupedPartitioning. At " +
+ "partially clustered, when both sides of the join are of
KeyedPartitioning. At " +
"planning time, Spark will pick the side with less data size based on
table " +
"statistics, group and replicate them to match the other side. This is
an optimization " +
"on skew join and can help to reduce data skewness when certain
partitions are assigned " +
@@ -2102,7 +2102,7 @@ object SQLConf {
val V2_BUCKETING_SHUFFLE_ENABLED =
buildConf("spark.sql.sources.v2.bucketing.shuffle.enabled")
.doc("During a storage-partitioned join, whether to allow to shuffle
only one side. " +
- "When only one side is KeyGroupedPartitioning, if the conditions are
met, spark will " +
+ "When only one side is KeyedPartitioning, if the conditions are met,
spark will " +
"only shuffle the other side. This optimization will reduce the amount
of data that " +
s"needs to be shuffle. This config requires
${V2_BUCKETING_ENABLED.key} to be enabled")
.version("4.0.0")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala
index 9910c4eb788c..221741a56fe0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala
@@ -45,16 +45,16 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* @param expectedPartitionKeys Optional sequence of expected partition key
values and their
* split counts
* @param reducers Optional reducers to apply to partition keys for grouping
compatibility
- * @param applyPartialClustering Whether to apply partial clustering for
skewed data
- * @param replicatePartitions Whether to replicate partitions across multiple
keys
+ * @param distributePartitions When true, splits for a key are distributed
across the expected
+ * partitions (padding with empty partitions).
When false, all splits
+ * are replicated to every expected partition for
that key.
*/
case class GroupPartitionsExec(
child: SparkPlan,
@transient joinKeyPositions: Option[Seq[Int]] = None,
@transient expectedPartitionKeys:
Option[Seq[(InternalRowComparableWrapper, Int)]] = None,
@transient reducers: Option[Seq[Option[Reducer[_, _]]]] = None,
- @transient applyPartialClustering: Boolean = false,
- @transient replicatePartitions: Boolean = false
+ @transient distributePartitions: Boolean = false
) extends UnaryExecNode {
override def outputPartitioning: Partitioning = {
@@ -91,7 +91,7 @@ case class GroupPartitionsExec(
val alignedPartitions = expectedPartitionKeys.get.flatMap { case (key,
numSplits) =>
if (numSplits > 1) isGrouped = false
val splits = keyMap.getOrElse(key, Seq.empty)
- if (applyPartialClustering && !replicatePartitions) {
+ if (distributePartitions) {
// Distribute splits across expected partitions, padding with empty
sequences
val paddedSplits = splits.map(Seq(_)).padTo(numSplits, Seq.empty)
paddedSplits.map((key, _))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 39da54625613..cca37558584f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -71,26 +71,40 @@ case class EnsureRequirements(
child
} else {
// Check KeyedPartitioning satisfaction conditions
- val groupedSatisfies = grouped.exists(_.satisfies(distribution))
+ val groupedSatisfies = grouped.find(_.satisfies(distribution))
val nonGroupedSatisfiesAsIs =
nonGrouped.exists(_.nonGroupedSatisfies(distribution))
- val nonGroupedSatisfiesWhenGrouped =
nonGrouped.exists(_.groupedSatisfies(distribution))
+ val nonGroupedSatisfiesWhenGrouped =
nonGrouped.find(_.groupedSatisfies(distribution))
// Check if any KeyedPartitioning satisfies the distribution
- if (groupedSatisfies || nonGroupedSatisfiesAsIs ||
nonGroupedSatisfiesWhenGrouped) {
+ if (groupedSatisfies.isDefined || nonGroupedSatisfiesAsIs
+ || nonGroupedSatisfiesWhenGrouped.isDefined) {
distribution match {
case o: OrderedDistribution =>
- // OrderedDistribution requires grouped KeyedPartitioning with
sorted keys.
+ // OrderedDistribution requires grouped KeyedPartitioning with
sorted keys
+ // according to the distribution's ordering.
// Find any KeyedPartitioning that satisfies via
groupedSatisfies.
val satisfyingKeyedPartitioning =
- (grouped ++
nonGrouped).find(_.groupedSatisfies(distribution)).get
+ groupedSatisfies.orElse(nonGroupedSatisfiesWhenGrouped).get
val attrs =
satisfyingKeyedPartitioning.expressions.flatMap(_.collectLeaves())
.map(_.asInstanceOf[Attribute])
val keyRowOrdering = RowOrdering.create(o.ordering, attrs)
val keyOrdering = keyRowOrdering.on((t:
InternalRowComparableWrapper) => t.row)
- val sorted =
satisfyingKeyedPartitioning.partitionKeys.sorted(keyOrdering)
- GroupPartitionsExec(child, expectedPartitionKeys =
Some(sorted.map((_, 1))))
-
- case _ if groupedSatisfies =>
+ if
(satisfyingKeyedPartitioning.partitionKeys.sliding(2).forall {
+ case Seq(k1, k2) => keyOrdering.lteq(k1, k2)
+ }) {
+ child
+ } else {
+ // Use distributePartitions to spread splits across expected
partitions
+ val sortedGroupedKeys =
satisfyingKeyedPartitioning.partitionKeys
+ .groupBy(identity).view.mapValues(_.size)
+ .toSeq.sortBy(_._1)(keyOrdering)
+ GroupPartitionsExec(child,
+ expectedPartitionKeys = Some(sortedGroupedKeys),
+ distributePartitions = true
+ )
+ }
+
+ case _ if groupedSatisfies.isDefined =>
// Grouped KeyedPartitioning already satisfies
child
@@ -238,7 +252,7 @@ case class EnsureRequirements(
// Hence we need to ensure that after this call, the
outputPartitioning of the
// partitioned side's BatchScanExec is grouped by join keys to
match,
// and we do that by pushing down the join keys
- case Some(KeyGroupedShuffleSpec(_, _, Some(joinKeyPositions))) =>
+ case Some(KeyedShuffleSpec(_, _, Some(joinKeyPositions))) =>
withJoinKeyPositions(child, joinKeyPositions)
case _ => child
}
@@ -258,7 +272,7 @@ case class EnsureRequirements(
child match {
case ShuffleExchangeExec(_, c, so, ps) =>
ShuffleExchangeExec(newPartitioning, c, so, ps)
- case GroupPartitionsExec(c, _, _, _, _, _) =>
ShuffleExchangeExec(newPartitioning, c)
+ case GroupPartitionsExec(c, _, _, _, _) =>
ShuffleExchangeExec(newPartitioning, c)
case _ => ShuffleExchangeExec(newPartitioning, child)
}
}
@@ -440,7 +454,7 @@ case class EnsureRequirements(
val specs = Seq(left, right).zip(requiredChildDistribution).map { case (p,
d) =>
if (!d.isInstanceOf[ClusteredDistribution]) return None
val cd = d.asInstanceOf[ClusteredDistribution]
- val specOpt = createKeyGroupedShuffleSpec(p.outputPartitioning, cd)
+ val specOpt = createKeyedShuffleSpec(p.outputPartitioning, cd)
if (specOpt.isEmpty) return None
specOpt.get
}
@@ -454,7 +468,7 @@ case class EnsureRequirements(
// partitionings are not modified (projected) in specs and left and right
side partitionings are
// compatible with each other.
// Left and right `outputPartitioning` is a `PartitioningCollection` or a
`KeyedPartitioning`
- // otherwise `createKeyGroupedShuffleSpec()` would have returned `None`.
+ // otherwise `createKeyedShuffleSpec()` would have returned `None`.
var isCompatible =
left.outputPartitioning.asInstanceOf[Expression].exists(_ ==
leftPartitioning) &&
right.outputPartitioning.asInstanceOf[Expression].exists(_ ==
rightPartitioning) &&
@@ -593,7 +607,7 @@ case class EnsureRequirements(
val originalPartitioning =
partiallyClusteredChild.outputPartitioning.asInstanceOf[Expression]
// `outputPartitioning` is either a `PartitioningCollection` or
a `KeyedPartitioning`
- // otherwise `createKeyGroupedShuffleSpec()` would have returned
`None`.
+ // otherwise `createKeyedShuffleSpec()` would have returned
`None`.
val originalKeyedPartitioning =
originalPartitioning.collectFirst { case k: KeyedPartitioning
=> k }.get
val projectedOriginalPartitionKeys =
partiallyClusteredSpec.joinKeyPositions
@@ -616,9 +630,9 @@ case class EnsureRequirements(
// Now we need to push-down the common partition information to the
`GroupPartitionsExec`s.
newLeft = applyGroupPartitions(left, leftSpec.joinKeyPositions,
mergedPartitionKeys,
- leftReducers, applyPartialClustering, replicateLeftSide)
+ leftReducers, distributePartitions = applyPartialClustering &&
!replicateLeftSide)
newRight = applyGroupPartitions(right, rightSpec.joinKeyPositions,
mergedPartitionKeys,
- rightReducers, applyPartialClustering, replicateRightSide)
+ rightReducers, distributePartitions = applyPartialClustering &&
!replicateRightSide)
}
}
@@ -673,21 +687,19 @@ case class EnsureRequirements(
joinKeyPositions: Option[Seq[Int]],
mergedPartitionKeys: Seq[(InternalRowComparableWrapper, Int)],
reducers: Option[Seq[Option[Reducer[_, _]]]],
- applyPartialClustering: Boolean,
- replicatePartitions: Boolean): SparkPlan = {
+ distributePartitions: Boolean): SparkPlan = {
plan match {
case g: GroupPartitionsExec =>
val newGroupPartitions = g.copy(
joinKeyPositions = joinKeyPositions,
expectedPartitionKeys = Some(mergedPartitionKeys),
reducers = reducers,
- applyPartialClustering = applyPartialClustering,
- replicatePartitions = replicatePartitions)
+ distributePartitions = distributePartitions)
newGroupPartitions.copyTagsFrom(g)
newGroupPartitions
case _ =>
GroupPartitionsExec(plan, joinKeyPositions, Some(mergedPartitionKeys),
reducers,
- applyPartialClustering, replicatePartitions)
+ distributePartitions)
}
}
@@ -705,14 +717,14 @@ case class EnsureRequirements(
}
/**
- * Tries to create a [[KeyGroupedShuffleSpec]] from the input partitioning
and distribution, if
- * the partitioning is a [[KeyedPartitioning]] (either directly or
indirectly), and
- * satisfies the given distribution.
+ * Tries to create a [[KeyedShuffleSpec]] from the input partitioning and
distribution, if the
+ * partitioning is a [[KeyedPartitioning]] (either directly or indirectly),
and satisfies the
+ * given distribution.
*/
- private def createKeyGroupedShuffleSpec(
+ private def createKeyedShuffleSpec(
partitioning: Partitioning,
- distribution: ClusteredDistribution): Option[KeyGroupedShuffleSpec] = {
- def tryCreate(partitioning: KeyedPartitioning):
Option[KeyGroupedShuffleSpec] = {
+ distribution: ClusteredDistribution): Option[KeyedShuffleSpec] = {
+ def tryCreate(partitioning: KeyedPartitioning): Option[KeyedShuffleSpec] =
{
val attributes = partitioning.expressions.flatMap(_.collectLeaves())
val clustering = distribution.clustering
@@ -725,7 +737,7 @@ case class EnsureRequirements(
}
if (satisfies) {
-
Some(partitioning.createShuffleSpec(distribution).asInstanceOf[KeyGroupedShuffleSpec])
+
Some(partitioning.createShuffleSpec(distribution).asInstanceOf[KeyedShuffleSpec])
} else {
None
}
@@ -734,7 +746,7 @@ case class EnsureRequirements(
partitioning match {
case p: KeyedPartitioning => tryCreate(p)
case PartitioningCollection(partitionings) =>
-
partitionings.collectFirst(Function.unlift(createKeyGroupedShuffleSpec(_,
distribution)))
+ partitionings.collectFirst(Function.unlift(createKeyedShuffleSpec(_,
distribution)))
case _ => None
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index 61384bf9f1fc..ace0040049ef 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -439,12 +439,13 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
Seq(true, false).foreach { sortingEnabled =>
withSQLConf(SQLConf.V2_BUCKETING_SORTING_ENABLED.key ->
sortingEnabled.toString) {
- def verifyShuffle(cmd: String, answer: Seq[Row]): Unit = {
+ def verifyShuffle(cmd: String, answer: Seq[Row],
expectedGroupPartitions: Int): Unit = {
val df = sql(cmd)
if (sortingEnabled) {
assert(collectAllShuffles(df.queryExecution.executedPlan).isEmpty,
"should contain no shuffle when sorting by partition values")
-
assert(collectAllGroupPartitions(df.queryExecution.executedPlan).size == 1,
+
assert(collectAllGroupPartitions(df.queryExecution.executedPlan).size ==
+ expectedGroupPartitions,
"should contain partition grouping when sorting by partition
values")
} else {
assert(collectAllShuffles(df.queryExecution.executedPlan).size ==
1,
@@ -457,30 +458,32 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
verifyShuffle(
s"SELECT price, id FROM testcat.ns.$items ORDER BY price ASC, id
ASC",
+ // Default ordering of partitions matches requested ordering so we
don't expect any
+ // shuffles or group partitions
Seq(Row(null, 3), Row(10.0, 2), Row(15.5, null),
- Row(15.5, 3), Row(40.0, 1), Row(41.0, 1)))
+ Row(15.5, 3), Row(40.0, 1), Row(41.0, 1)), 0)
verifyShuffle(
s"SELECT price, id FROM testcat.ns.$items " +
s"ORDER BY price ASC NULLS LAST, id ASC NULLS LAST",
Seq(Row(10.0, 2), Row(15.5, 3), Row(15.5, null),
- Row(40.0, 1), Row(41.0, 1), Row(null, 3)))
+ Row(40.0, 1), Row(41.0, 1), Row(null, 3)), 1)
verifyShuffle(
s"SELECT price, id FROM testcat.ns.$items ORDER BY price DESC, id
ASC",
Seq(Row(41.0, 1), Row(40.0, 1), Row(15.5, null),
- Row(15.5, 3), Row(10.0, 2), Row(null, 3)))
+ Row(15.5, 3), Row(10.0, 2), Row(null, 3)), 1)
verifyShuffle(
s"SELECT price, id FROM testcat.ns.$items ORDER BY price DESC, id
DESC",
Seq(Row(41.0, 1), Row(40.0, 1), Row(15.5, 3),
- Row(15.5, null), Row(10.0, 2), Row(null, 3)))
+ Row(15.5, null), Row(10.0, 2), Row(null, 3)), 1)
verifyShuffle(
s"SELECT price, id FROM testcat.ns.$items " +
s"ORDER BY price DESC NULLS FIRST, id DESC NULLS FIRST",
Seq(Row(null, 3), Row(41.0, 1), Row(40.0, 1),
- Row(15.5, null), Row(15.5, 3), Row(10.0, 2)));
+ Row(15.5, null), Row(15.5, 3), Row(10.0, 2)), 1);
}
}
}
@@ -3142,7 +3145,7 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
}
}
- test("SPARK-55535: Empty group partitions due filtered partitions") {
+ test("SPARK-55535: Empty group partitions due to filtered partitions") {
val items_partitions = Array(identity("id"))
createTable(items, itemsColumns, items_partitions)
@@ -3167,4 +3170,50 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
"group partitions should not have any (common) partitions")
}
}
+
+ test("SPARK-55535: Order by on partitions keys") {
+ withSQLConf(SQLConf.V2_BUCKETING_SORTING_ENABLED.key -> "true") {
+ val items_partitions = Array(identity("id"))
+ createTable(items, itemsColumns, items_partitions)
+
+ sql(s"INSERT INTO testcat.ns.$items VALUES " +
+ "(2, 'aa', 10.0, cast('2021-01-01' as timestamp)), " +
+ "(3, 'aa', 20.0, cast('2022-01-01' as timestamp)), " +
+ "(1, 'aa', 40.0, cast('2022-01-01' as timestamp))")
+
+ val df = sql(s"SELECT id FROM testcat.ns.$items i ORDER BY id")
+
+ val expected = (1 to 3).map(Row(_))
+ checkAnswer(df, expected)
+
+ val reverseDf = sql(s"SELECT id FROM testcat.ns.$items i ORDER BY id
DESC")
+
+ checkAnswer(reverseDf, expected.reverse)
+
+ sql(s"INSERT INTO testcat.ns.$items VALUES (2, 'aa', 30.0,
cast('2021-01-01' as timestamp))")
+
+ val dfWithDuplicate = sql(s"SELECT id FROM testcat.ns.$items i ORDER BY
id")
+
+ val expectedWithDuplicate = Seq(1, 2, 2, 3).map(Row(_))
+ checkAnswer(dfWithDuplicate, expectedWithDuplicate)
+
+ val reverseDfWithDuplicate = sql(s"SELECT id FROM testcat.ns.$items i
ORDER BY id DESC")
+
+ checkAnswer(reverseDfWithDuplicate, expectedWithDuplicate.reverse)
+
+ Seq(
+ df -> Seq.empty,
+ reverseDf -> Seq(3),
+ dfWithDuplicate -> Seq.empty,
+ reverseDfWithDuplicate -> Seq(4)
+ ).foreach {
+ case (df, expectedPartitions) =>
+ val shuffles = collectAllShuffles(df.queryExecution.executedPlan)
+ assert(shuffles.isEmpty, "should not contain any shuffle")
+
+ val groupPartitions =
collectAllGroupPartitions(df.queryExecution.executedPlan)
+ assert(groupPartitions.map(_.outputPartitioning.numPartitions) ==
expectedPartitions)
+ }
+ }
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
index 7512cbe7f90b..9c67a334c801 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
@@ -1037,10 +1037,10 @@ class EnsureRequirementsSuite extends
SharedSparkSession {
case SortMergeJoinExec(_, _, _, _,
SortExec(_, _,
GroupPartitionsExec(DummySparkPlan(_, _, left:
KeyedPartitioning, _, _),
- _, _, _, _, _), _),
+ _, _, _, _), _),
SortExec(_, _,
GroupPartitionsExec(DummySparkPlan(_, _, right:
KeyedPartitioning, _, _),
- _, _, _, _, _), _),
+ _, _, _, _), _),
_) =>
assert(left.expressions === Seq(bucket(4, exprB), bucket(8, exprC)))
assert(right.expressions === Seq(bucket(4, exprC), bucket(8, exprB)))
@@ -1061,10 +1061,10 @@ class EnsureRequirementsSuite extends
SharedSparkSession {
case SortMergeJoinExec(_, _, _, _,
SortExec(_, _,
GroupPartitionsExec(DummySparkPlan(_, _, left:
PartitioningCollection, _, _),
- _, _, _, _, _), _),
+ _, _, _, _), _),
SortExec(_, _,
GroupPartitionsExec(DummySparkPlan(_, _, right:
KeyedPartitioning, _, _),
- _, _, _, _, _), _),
+ _, _, _, _), _),
_) =>
assert(left.partitionings.length == 2)
assert(left.partitionings.head.isInstanceOf[KeyedPartitioning])
@@ -1096,10 +1096,10 @@ class EnsureRequirementsSuite extends
SharedSparkSession {
case SortMergeJoinExec(_, _, _, _,
SortExec(_, _,
GroupPartitionsExec(DummySparkPlan(_, _, left:
PartitioningCollection, _, _),
- _, _, _, _, _), _),
+ _, _, _, _), _),
SortExec(_, _,
GroupPartitionsExec(DummySparkPlan(_, _, right:
PartitioningCollection, _, _),
- _, _, _, _, _), _),
+ _, _, _, _), _),
_) =>
assert(left.partitionings.length == 2)
assert(left.partitionings.head.isInstanceOf[KeyedPartitioning])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]