This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 7659a6a [SPARK-31124][SQL] change the default value of
minPartitionNum in AQE
7659a6a is described below
commit 7659a6a66e5170b078de9af00a27a0e65d5e12de
Author: Wenchen Fan <[email protected]>
AuthorDate: Thu Mar 12 21:28:24 2020 +0800
[SPARK-31124][SQL] change the default value of minPartitionNum in AQE
### What changes were proposed in this pull request?
AQE has a perf regression when using the default settings: if we coalesce
the shuffle partitions into one or few partitions, we may leave many CPU cores
idle and the perf is worse than with AQE off (which leverages all CPU cores).
Technically, this is not a bad thing. If there are many queries running at
the same time, it's better to coalesce shuffle partitions into fewer
partitions. However, the default settings of AQE should try to avoid any perf
regression as possible as we can.
This PR changes the default value of minPartitionNum when coalescing
shuffle partitions, to be `SparkContext.defaultParallelism`, so that AQE can
leverage all the CPU cores.
### Why are the changes needed?
avoid AQE perf regression
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
existing tests
Closes #27879 from cloud-fan/aqe.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 77c49cb702862a0c60733dba797201ada2f5b51a)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 7 +++----
.../spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +-
.../sql/execution/adaptive/CoalesceShufflePartitions.scala | 10 ++++++++--
.../spark/sql/execution/adaptive/OptimizeSkewedJoin.scala | 9 +++++++--
.../spark/sql/execution/adaptive/ShufflePartitionsUtil.scala | 2 +-
5 files changed, 20 insertions(+), 10 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3738922..06180f6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -404,12 +404,13 @@ object SQLConf {
val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
- .doc("The minimum number of shuffle partitions after coalescing. This
configuration only " +
+ .doc("The minimum number of shuffle partitions after coalescing. If not
set, the default " +
+ "value is the default parallelism of the Spark cluster. This
configuration only " +
s"has an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
.intConf
.checkValue(_ > 0, "The minimum number of partitions must be positive.")
- .createWithDefault(1)
+ .createOptional
val COALESCE_PARTITIONS_INITIAL_PARTITION_NUM =
buildConf("spark.sql.adaptive.coalescePartitions.initialPartitionNum")
@@ -2446,8 +2447,6 @@ class SQLConf extends Serializable with Logging {
def coalesceShufflePartitionsEnabled: Boolean =
getConf(COALESCE_PARTITIONS_ENABLED)
- def minShufflePartitionNum: Int =
getConf(COALESCE_PARTITIONS_MIN_PARTITION_NUM)
-
def initialShufflePartitionNum: Int =
getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index c1486aa..68da06d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -100,7 +100,7 @@ case class AdaptiveSparkPlanExec(
// before 'CoalesceShufflePartitions', as the skewed partition handled
// in 'OptimizeSkewedJoin' rule, should be omitted in
'CoalesceShufflePartitions'.
OptimizeSkewedJoin(conf),
- CoalesceShufflePartitions(conf),
+ CoalesceShufflePartitions(context.session),
// The rule of 'OptimizeLocalShuffleReader' need to make use of the
'partitionStartIndices'
// in 'CoalesceShufflePartitions' rule. So it must be after
'CoalesceShufflePartitions' rule.
OptimizeLocalShuffleReader(conf),
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index d779a20..d2a7f6a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.adaptive
import org.apache.spark.MapOutputStatistics
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf
@@ -26,8 +27,9 @@ import org.apache.spark.sql.internal.SQLConf
* A rule to coalesce the shuffle partitions based on the map output
statistics, which can
* avoid many small reduce tasks that hurt performance.
*/
-case class CoalesceShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
+case class CoalesceShufflePartitions(session: SparkSession) extends
Rule[SparkPlan] {
import CoalesceShufflePartitions._
+ private def conf = session.sessionState.conf
override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.coalesceShufflePartitionsEnabled) {
@@ -66,12 +68,16 @@ case class CoalesceShufflePartitions(conf: SQLConf) extends
Rule[SparkPlan] {
val distinctNumPreShufflePartitions =
validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length ==
1) {
+ // We fall back to Spark default parallelism if the minimum number of
coalesced partitions
+ // is not set, so to avoid perf regressions compared to no coalescing.
+ val minPartitionNum =
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
+ .getOrElse(session.sparkContext.defaultParallelism)
val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(
validMetrics.toArray,
firstPartitionIndex = 0,
lastPartitionIndex = distinctNumPreShufflePartitions.head,
advisoryTargetSize =
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
- minNumPartitions = conf.minShufflePartitionNum)
+ minNumPartitions = minPartitionNum)
// This transformation adds new nodes, so we must use `transformUp`
here.
val stageIds = shuffleStages.map(_.id).toSet
plan.transformUp {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index 7f52393..a75a3f3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
import org.apache.commons.io.FileUtils
-import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster,
SparkContext, SparkEnv}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
@@ -261,12 +261,17 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) {
nonSkewPartitionIndices.map(i => CoalescedPartitionSpec(i, i + 1))
} else {
+ // We fall back to Spark default parallelism if the minimum number of
coalesced partitions
+ // is not set, so to avoid perf regressions compared to no coalescing.
+ val minPartitionNum =
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
+ .getOrElse(SparkContext.getActive.get.defaultParallelism)
ShufflePartitionsUtil.coalescePartitions(
Array(leftStats, rightStats),
firstPartitionIndex = nonSkewPartitionIndices.head,
// `lastPartitionIndex` is exclusive.
lastPartitionIndex = nonSkewPartitionIndices.last + 1,
- advisoryTargetSize =
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES))
+ advisoryTargetSize =
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
+ minNumPartitions = minPartitionNum)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 32f5fd4..292df11 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -59,7 +59,7 @@ object ShufflePartitionsUtil extends Logging {
firstPartitionIndex: Int,
lastPartitionIndex: Int,
advisoryTargetSize: Long,
- minNumPartitions: Int = 1): Seq[ShufflePartitionSpec] = {
+ minNumPartitions: Int): Seq[ShufflePartitionSpec] = {
// If `minNumPartitions` is very large, it is possible that we need to use
a value less than
// `advisoryTargetSize` as the target size of a coalesced task.
val totalPostShuffleInputSize =
mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]