This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 5bddafe [SPARK-36430][SQL] Adaptively calculate the target size when
coalescing shuffle partitions in AQE
5bddafe is described below
commit 5bddafe3e006eda55b4466ceca8c6411b369ebad
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Aug 9 17:25:55 2021 +0800
[SPARK-36430][SQL] Adaptively calculate the target size when coalescing
shuffle partitions in AQE
### What changes were proposed in this pull request?
This PR fixes a performance regression introduced in
https://github.com/apache/spark/pull/33172
Before #33172 , the target size is adaptively calculated based on the
default parallelism of the spark cluster. Sometimes it's very small and #33172
sets a min partition size to fix perf issues. Sometimes the calculated size is
reasonable, such as dozens of MBs.
After #33172 , we no longer calculate the target size adaptively, and by
default always coalesce the partitions into 1 MB. This can cause perf
regression if the adaptively calculated size is reasonable.
This PR brings back the code that adaptively calculate the target size
based on the default parallelism of the spark cluster.
### Why are the changes needed?
fix perf regression
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes #33655 from cloud-fan/minor.
Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 9a539d5846814f5fd5317b9d0b7eb1a41299f092)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 21 +++++-----
.../adaptive/CoalesceShufflePartitions.scala | 46 +++++++++++++---------
.../execution/adaptive/ShufflePartitionsUtil.scala | 10 ++---
.../adaptive/AdaptiveQueryExecSuite.scala | 3 +-
.../apache/spark/sql/internal/SQLConfSuite.scala | 6 ++-
5 files changed, 46 insertions(+), 40 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4d78913..555242f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -480,6 +480,7 @@ object SQLConf {
.doc("(Deprecated since Spark 3.0)")
.version("1.6.0")
.bytesConf(ByteUnit.BYTE)
+ .checkValue(_ > 0, "advisoryPartitionSizeInBytes must be positive")
.createWithDefaultString("64MB")
val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
@@ -526,28 +527,26 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- private val MIN_PARTITION_SIZE_KEY =
"spark.sql.adaptive.coalescePartitions.minPartitionSize"
-
val COALESCE_PARTITIONS_PARALLELISM_FIRST =
buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst")
- .doc("When true, Spark ignores the target size specified by " +
+ .doc("When true, Spark does not respect the target size specified by " +
s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' (default 64MB) when
coalescing contiguous " +
- "shuffle partitions, and only respect the minimum partition size
specified by " +
- s"'$MIN_PARTITION_SIZE_KEY' (default 1MB), to maximize the
parallelism. " +
- "This is to avoid performance regression when enabling adaptive query
execution. " +
- "It's recommended to set this config to false and respect the target
size specified by " +
- s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'.")
+ "shuffle partitions, but adaptively calculate the target size
according to the default " +
+ "parallelism of the Spark cluster. The calculated size is usually
smaller than the " +
+ "configured target size. This is to maximize the parallelism and avoid
performance " +
+ "regression when enabling adaptive query execution. It's recommended
to set this config " +
+ "to false and respect the configured target size.")
.version("3.2.0")
.booleanConf
.createWithDefault(true)
val COALESCE_PARTITIONS_MIN_PARTITION_SIZE =
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionSize")
- .doc("The minimum size of shuffle partitions after coalescing. Its value
can be at most " +
- s"20% of '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'. This is useful
when the target size " +
- "is ignored during partition coalescing, which is the default case.")
+ .doc("The minimum size of shuffle partitions after coalescing. This is
useful when the " +
+ "adaptively calculated target size is too small during partition
coalescing.")
.version("3.2.0")
.bytesConf(ByteUnit.BYTE)
+ .checkValue(_ > 0, "minPartitionSize must be positive")
.createWithDefaultString("1MB")
val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 75c53b4..5c14caa 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -22,6 +22,7 @@ import
org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan}
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL,
ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
/**
* A rule to coalesce the shuffle partitions based on the map output
statistics, which can
@@ -59,33 +60,40 @@ case class CoalesceShufflePartitions(session: SparkSession)
extends AQEShuffleRe
if (!shuffleStageInfos.forall(s => isSupported(s.shuffleStage.shuffle))) {
plan
} else {
- // Ideally, this rule should simply coalesce partition w.r.t. the target
size specified by
+ // Ideally, this rule should simply coalesce partitions w.r.t. the
target size specified by
// ADVISORY_PARTITION_SIZE_IN_BYTES (default 64MB). To avoid perf
regression in AQE, this
- // rule by default ignores the target size (set it to 0), and only
respect the minimum
- // partition size specified by COALESCE_PARTITIONS_MIN_PARTITION_SIZE
(default 1MB).
+ // rule by default tries to maximize the parallelism and set the target
size to
+ // `total shuffle size / Spark default parallelism`. In case the `Spark
default parallelism`
+ // is too big, this rule also respect the minimum partition size
specified by
+ // COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB).
// For history reason, this rule also need to support the config
- // COALESCE_PARTITIONS_MIN_PARTITION_NUM: if it's set, we will respect
both the target
- // size and minimum partition number, no matter
COALESCE_PARTITIONS_PARALLELISM_FIRST is true
- // or false.
- // TODO: remove the `minNumPartitions` parameter from
- // `ShufflePartitionsUtil.coalescePartitions` after we remove the
config
- // COALESCE_PARTITIONS_MIN_PARTITION_NUM
- val minPartitionNum =
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
- val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
- // `minPartitionSize` can be at most 20% of `advisorySize`.
- val minPartitionSize = math.min(
- advisorySize / 5,
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE))
- val parallelismFirst =
conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)
- val advisoryTargetSize = if (minPartitionNum.isEmpty &&
parallelismFirst) {
- 0
+ // COALESCE_PARTITIONS_MIN_PARTITION_NUM. We should remove this config
in the future.
+ val minNumPartitions =
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse {
+ if (conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)) {
+ // We fall back to Spark default parallelism if the minimum number
of coalesced partitions
+ // is not set, so to avoid perf regressions compared to no
coalescing.
+ session.sparkContext.defaultParallelism
+ } else {
+ // If we don't need to maximize the parallelism, we set
`minPartitionNum` to 1, so that
+ // the specified advisory partition size will be respected.
+ 1
+ }
+ }
+ val advisoryTargetSize =
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+ val minPartitionSize = if (Utils.isTesting) {
+ // In the tests, we usually set the target size to a very small value
that is even smaller
+ // than the default value of the min partition size. Here we also
adjust the min partition
+ // size to be not larger than 20% of the target size, so that the
tests don't need to set
+ // both configs all the time to check the coalescing behavior.
+
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE).min(advisoryTargetSize
/ 5)
} else {
- advisorySize
+ conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE)
}
val newPartitionSpecs = ShufflePartitionsUtil.coalescePartitions(
shuffleStageInfos.map(_.shuffleStage.mapStats),
shuffleStageInfos.map(_.partitionSpecs),
advisoryTargetSize = advisoryTargetSize,
- minNumPartitions =
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse(1),
+ minNumPartitions = minNumPartitions,
minPartitionSize = minPartitionSize)
if (newPartitionSpecs.nonEmpty) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 64f89b9..3609548 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -56,13 +56,9 @@ object ShufflePartitionsUtil extends Logging {
// If `minNumPartitions` is very large, it is possible that we need to use
a value less than
// `advisoryTargetSize` as the target size of a coalesced task.
val totalPostShuffleInputSize =
mapOutputStatistics.flatMap(_.map(_.bytesByPartitionId.sum)).sum
- // The max at here is to make sure that when we have an empty table, we
only have a single
- // coalesced partition.
- // There is no particular reason that we pick 16. We just need a number to
prevent
- // `maxTargetSize` from being set to 0.
- val maxTargetSize = math.max(
- math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong,
16)
- val targetSize = math.min(maxTargetSize, advisoryTargetSize)
+ val maxTargetSize = math.ceil(totalPostShuffleInputSize /
minNumPartitions.toDouble).toLong
+ // It's meaningless to make target size smaller than minPartitionSize.
+ val targetSize =
maxTargetSize.min(advisoryTargetSize).max(minPartitionSize)
val shuffleIds =
mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ")
logInfo(s"For shuffle($shuffleIds), advisory target size:
$advisoryTargetSize, " +
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index ca8295e..9837138 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1685,7 +1685,8 @@ class AdaptiveQueryExecSuite
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
- SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2258",
+ // Pick a small value so that no coalesce can happen.
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
SQLConf.SHUFFLE_PARTITIONS.key -> "2") {
val df = spark.sparkContext.parallelize(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index 6f2452a..5e661a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -259,8 +259,10 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1g")
assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) ===
1073741824)
- spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-1")
- assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === -1)
+ // test negative value
+ intercept[IllegalArgumentException] {
+ spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-1")
+ }
// Test overflow exception
intercept[IllegalArgumentException] {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]