This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 69f9ee1 [SPARK-31452][SQL] Do not create partition spec for 0-size
partitions in AQE
69f9ee1 is described below
commit 69f9ee18b66991f8d6989fa20f93cd7343be28a9
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Apr 20 13:50:07 2020 -0700
[SPARK-31452][SQL] Do not create partition spec for 0-size partitions in AQE
### What changes were proposed in this pull request?
This PR skips creating the partition specs in `ShufflePartitionsUtil` for
0-size partitions, which avoids launching unnecessary tasks that do nothing.
### Why are the changes needed?
launching tasks that do nothing is a waste.
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
updated tests
Closes #28226 from cloud-fan/aqe.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 7 ++-
.../adaptive/CustomShuffleReaderExec.scala | 5 +-
.../adaptive/OptimizeLocalShuffleReader.scala | 4 +-
.../execution/adaptive/OptimizeSkewedJoin.scala | 10 ++--
.../execution/adaptive/ShufflePartitionsUtil.scala | 13 ++++-
.../sql/execution/ShufflePartitionsUtilSuite.scala | 65 ++++++++++++++-------
.../adaptive/AdaptiveQueryExecSuite.scala | 67 ++++++++++++++++------
7 files changed, 120 insertions(+), 51 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e6a3966..a257668 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -434,9 +434,10 @@ object SQLConf {
val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
- .doc("The minimum number of shuffle partitions after coalescing. If not
set, the default " +
- "value is the default parallelism of the Spark cluster. This
configuration only " +
- s"has an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
+ .doc("The suggested (not guaranteed) minimum number of shuffle
partitions after " +
+ "coalescing. If not set, the default value is the default parallelism
of the " +
+ "Spark cluster. This configuration only has an effect when " +
+ s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
.version("3.0.0")
.intConf
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
index 6450d49..e7f3bf1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
@@ -49,7 +49,8 @@ case class CustomShuffleReaderExec private(
// If it is a local shuffle reader with one mapper per task, then the
output partitioning is
// the same as the plan before shuffle.
// TODO this check is based on assumptions of callers' behavior but is
sufficient for now.
- if (partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
+ if (partitionSpecs.nonEmpty &&
+ partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
partitionSpecs.map(_.asInstanceOf[PartialMapperPartitionSpec].mapIndex).toSet.size
==
partitionSpecs.length) {
child match {
@@ -98,7 +99,7 @@ case class CustomShuffleReaderExec private(
}
@transient private lazy val partitionDataSizes: Option[Seq[Long]] = {
- if (!isLocalReader && shuffleStage.get.mapStats.isDefined) {
+ if (partitionSpecs.nonEmpty && !isLocalReader &&
shuffleStage.get.mapStats.isDefined) {
val bytesByPartitionId = shuffleStage.get.mapStats.get.bytesByPartitionId
Some(partitionSpecs.map {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
index a5b3cac..5416fde 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
@@ -141,8 +141,8 @@ object OptimizeLocalShuffleReader {
def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
case s: ShuffleQueryStageExec =>
s.shuffle.canChangeNumPartitions
- case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
- s.shuffle.canChangeNumPartitions
+ case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
+ s.shuffle.canChangeNumPartitions && partitionSpecs.nonEmpty
case _ => false
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index 58e07fa..396c9c9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -88,9 +88,11 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {
val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))
- // It's impossible that all the partitions are skewed, as we use median
size to define skew.
- assert(nonSkewSizes.nonEmpty)
- math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)
+ if (nonSkewSizes.isEmpty) {
+ advisorySize
+ } else {
+ math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)
+ }
}
/**
@@ -297,7 +299,7 @@ private object ShuffleStage {
Some(ShuffleStageInfo(s, mapStats, partitions))
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs)
- if s.mapStats.isDefined =>
+ if s.mapStats.isDefined && partitionSpecs.nonEmpty =>
val mapStats = s.mapStats.get
val sizes = mapStats.bytesByPartitionId
val partitions = partitionSpecs.map {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index e10ed4f..d6e44b7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -91,6 +91,14 @@ object ShufflePartitionsUtil extends Logging {
var latestSplitPoint = 0
var coalescedSize = 0L
var i = 0
+
+ def createPartitionSpec(): Unit = {
+ // Skip empty inputs, as it is a waste to launch an empty task.
+ if (coalescedSize > 0) {
+ partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)
+ }
+ }
+
while (i < numPartitions) {
// We calculate the total size of i-th shuffle partitions from all
shuffles.
var totalSizeOfCurrentPartition = 0L
@@ -103,7 +111,7 @@ object ShufflePartitionsUtil extends Logging {
// If including the `totalSizeOfCurrentPartition` would exceed the
target size, then start a
// new coalesced partition.
if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition
> targetSize) {
- partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)
+ createPartitionSpec()
latestSplitPoint = i
// reset postShuffleInputSize.
coalescedSize = totalSizeOfCurrentPartition
@@ -112,8 +120,7 @@ object ShufflePartitionsUtil extends Logging {
}
i += 1
}
- partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)
-
+ createPartitionSpec()
partitionSpecs
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
index 7acc33c..f5c3b78 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
@@ -42,13 +42,6 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
val targetSize = 100
{
- // All bytes per partition are 0.
- val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
- val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
- checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs,
targetSize)
- }
-
- {
// Some bytes per partition are 0 and total size is less than the target
size.
// 1 coalesced partition is expected.
val bytesByPartitionId = Array[Long](10, 0, 20, 0, 0)
@@ -70,8 +63,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
CoalescedPartitionSpec(0, 1),
CoalescedPartitionSpec(1, 2),
CoalescedPartitionSpec(2, 3),
- CoalescedPartitionSpec(3, 4),
- CoalescedPartitionSpec(4, 5))
+ CoalescedPartitionSpec(3, 4))
checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs,
targetSize)
}
@@ -109,17 +101,6 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
}
{
- // All bytes per partition are 0.
- val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
- val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
- val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
- checkEstimation(
- Array(bytesByPartitionId1, bytesByPartitionId2),
- expectedPartitionSpecs,
- targetSize)
- }
-
- {
// Some bytes per partition are 0.
// 1 coalesced partition is expected.
val bytesByPartitionId1 = Array[Long](0, 10, 0, 20, 0)
@@ -217,7 +198,18 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
// the size of data is 0.
val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
- val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
+ checkEstimation(
+ Array(bytesByPartitionId1, bytesByPartitionId2),
+ Seq.empty, targetSize, minNumPartitions)
+ }
+
+
+ {
+ // The minimal number of coalesced partitions is not enforced because
+ // there are too many 0-size partitions.
+ val bytesByPartitionId1 = Array[Long](200, 0, 0)
+ val bytesByPartitionId2 = Array[Long](100, 0, 0)
+ val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 1))
checkEstimation(
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionSpecs,
@@ -251,6 +243,37 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
}
}
+ test("do not create partition spec for 0-size partitions") {
+ val targetSize = 100
+ val minNumPartitions = 2
+
+ {
+ // 1 shuffle: All bytes per partition are 0, no partition spec created.
+ val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
+ checkEstimation(Array(bytesByPartitionId), Seq.empty, targetSize)
+ }
+
+ {
+ // 2 shuffles: All bytes per partition are 0, no partition spec created.
+ val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
+ val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
+ checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2),
Seq.empty, targetSize)
+ }
+
+ {
+ // No partition spec created for the 0-size partitions.
+ val bytesByPartitionId1 = Array[Long](200, 0, 0, 0, 0)
+ val bytesByPartitionId2 = Array[Long](100, 0, 300, 0, 0)
+ val expectedPartitionSpecs = Seq(
+ CoalescedPartitionSpec(0, 1),
+ CoalescedPartitionSpec(2, 3))
+ checkEstimation(
+ Array(bytesByPartitionId1, bytesByPartitionId2),
+ expectedPartitionSpecs,
+ targetSize, minNumPartitions)
+ }
+ }
+
test("splitSizeListByTargetSize") {
val targetSize = 100
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 694be98..e82ccda 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -157,17 +157,17 @@ class AdaptiveQueryExecSuite
val localShuffleRDD0 =
localReaders(0).execute().asInstanceOf[ShuffledRowRDD]
val localShuffleRDD1 =
localReaders(1).execute().asInstanceOf[ShuffledRowRDD]
// The pre-shuffle partition size is [0, 0, 0, 72, 0]
- // And the partitionStartIndices is [0, 3, 4], so advisoryParallelism =
3.
+ // We exclude the 0-size partitions, so only one partition,
advisoryParallelism = 1
// the final parallelism is
- // math.max(1, advisoryParallelism / numMappers): math.max(1, 3/2) = 1
+ // math.max(1, advisoryParallelism / numMappers): math.max(1, 1/2) = 1
// and the partitions length is 1 * numMappers = 2
assert(localShuffleRDD0.getPartitions.length == 2)
// The pre-shuffle partition size is [0, 72, 0, 72, 126]
- // And the partitionStartIndices is [0, 1, 2, 3, 4], so
advisoryParallelism = 5.
+ // We exclude the 0-size partitions, so only 3 partition,
advisoryParallelism = 3
// the final parallelism is
- // math.max(1, advisoryParallelism / numMappers): math.max(1, 5/2) = 2
- // and the partitions length is 2 * numMappers = 4
- assert(localShuffleRDD1.getPartitions.length == 4)
+ // math.max(1, advisoryParallelism / numMappers): math.max(1, 3/2) = 1
+ // and the partitions length is 1 * numMappers = 2
+ assert(localShuffleRDD1.getPartitions.length == 2)
}
}
@@ -197,6 +197,38 @@ class AdaptiveQueryExecSuite
}
}
+ test("Empty stage coalesced to 0-partition RDD") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true") {
+ val df1 = spark.range(10).withColumn("a", 'id)
+ val df2 = spark.range(10).withColumn("b", 'id)
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ val testDf = df1.where('a > 10).join(df2.where('b > 10),
"id").groupBy('a).count()
+ checkAnswer(testDf, Seq())
+ val plan = testDf.queryExecution.executedPlan
+ assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined)
+ val coalescedReaders = collect(plan) {
+ case r: CustomShuffleReaderExec => r
+ }
+ assert(coalescedReaders.length == 2)
+ coalescedReaders.foreach(r => assert(r.partitionSpecs.isEmpty))
+ }
+
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+ val testDf = df1.where('a > 10).join(df2.where('b > 10),
"id").groupBy('a).count()
+ checkAnswer(testDf, Seq())
+ val plan = testDf.queryExecution.executedPlan
+ assert(find(plan)(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+ val coalescedReaders = collect(plan) {
+ case r: CustomShuffleReaderExec => r
+ }
+ assert(coalescedReaders.length == 2, s"$plan")
+ coalescedReaders.foreach(r => assert(r.partitionSpecs.isEmpty))
+ }
+ }
+ }
+
test("Scalar subquery") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
@@ -647,12 +679,13 @@ class AdaptiveQueryExecSuite
// Partition 0: both left and right sides are skewed, left side is
divided
// into 2 splits and right side is divided into 4 splits,
so
// 2 x 4 sub-partitions.
- // Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
+ // Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but
it's ignored as the
+ // size is 0.
// Partition 4: only left side is skewed, and divide into 2 splits, so
// 2 sub-partitions.
- // So total (8 + 1 + 3) partitions.
+ // So total (8 + 0 + 2) partitions.
val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
- checkSkewJoin(innerSmj, 8 + 1 + 2)
+ checkSkewJoin(innerSmj, 8 + 0 + 2)
// skewed left outer join optimization
val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
@@ -661,12 +694,13 @@ class AdaptiveQueryExecSuite
// right stats:[6292, 0, 0, 0, 0]
// Partition 0: both left and right sides are skewed, but left join
can't split right side,
// so only left side is divided into 2 splits, and thus 2
sub-partitions.
- // Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
+ // Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but
it's ignored as the
+ // size is 0.
// Partition 4: only left side is skewed, and divide into 2 splits, so
// 2 sub-partitions.
- // So total (2 + 1 + 2) partitions.
+ // So total (2 + 0 + 2) partitions.
val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
- checkSkewJoin(leftSmj, 2 + 1 + 2)
+ checkSkewJoin(leftSmj, 2 + 0 + 2)
// skewed right outer join optimization
val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
@@ -675,12 +709,13 @@ class AdaptiveQueryExecSuite
// right stats:[6292, 0, 0, 0, 0]
// Partition 0: both left and right sides are skewed, but right join
can't split left side,
// so only right side is divided into 4 splits, and thus
4 sub-partitions.
- // Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
+ // Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but
it's ignored as the
+ // size is 0.
// Partition 4: only left side is skewed, but right join can't split
left side, so just
// 1 partition.
- // So total (4 + 1 + 1) partitions.
+ // So total (4 + 0 + 1) partitions.
val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
- checkSkewJoin(rightSmj, 4 + 1 + 1)
+ checkSkewJoin(rightSmj, 4 + 0 + 1)
}
}
}
@@ -852,7 +887,7 @@ class AdaptiveQueryExecSuite
}.head
assert(!reader.isLocalReader)
assert(reader.hasSkewedPartition)
- assert(reader.hasCoalescedPartition)
+ assert(!reader.hasCoalescedPartition) // 0-size partitions are
ignored.
assert(reader.metrics.contains("numSkewedPartitions"))
assert(reader.metrics("numSkewedPartitions").value > 0)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]