This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 77c49cb [SPARK-31124][SQL] change the default value of minPartitionNum in AQE 77c49cb is described below commit 77c49cb702862a0c60733dba797201ada2f5b51a Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Thu Mar 12 21:28:24 2020 +0800 [SPARK-31124][SQL] change the default value of minPartitionNum in AQE ### What changes were proposed in this pull request? AQE has a perf regression when using the default settings: if we coalesce the shuffle partitions into one or few partitions, we may leave many CPU cores idle and the perf is worse than with AQE off (which leverages all CPU cores). Technically, this is not a bad thing. If there are many queries running at the same time, it's better to coalesce shuffle partitions into fewer partitions. However, the default settings of AQE should try to avoid any perf regression as possible as we can. This PR changes the default value of minPartitionNum when coalescing shuffle partitions, to be `SparkContext.defaultParallelism`, so that AQE can leverage all the CPU cores. ### Why are the changes needed? avoid AQE perf regression ### Does this PR introduce any user-facing change? No ### How was this patch tested? existing tests Closes #27879 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 7 +++---- .../spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +- .../sql/execution/adaptive/CoalesceShufflePartitions.scala | 10 ++++++++-- .../spark/sql/execution/adaptive/OptimizeSkewedJoin.scala | 9 +++++++-- .../spark/sql/execution/adaptive/ShufflePartitionsUtil.scala | 2 +- 5 files changed, 20 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7780585..b5f2046 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -434,13 +434,14 @@ object SQLConf { val COALESCE_PARTITIONS_MIN_PARTITION_NUM = buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum") - .doc("The minimum number of shuffle partitions after coalescing. This configuration only " + + .doc("The minimum number of shuffle partitions after coalescing. If not set, the default " + + "value is the default parallelism of the Spark cluster. This configuration only " + s"has an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " + s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.") .version("3.0.0") .intConf .checkValue(_ > 0, "The minimum number of partitions must be positive.") - .createWithDefault(1) + .createOptional val COALESCE_PARTITIONS_INITIAL_PARTITION_NUM = buildConf("spark.sql.adaptive.coalescePartitions.initialPartitionNum") @@ -2703,8 +2704,6 @@ class SQLConf extends Serializable with Logging { def coalesceShufflePartitionsEnabled: Boolean = getConf(COALESCE_PARTITIONS_ENABLED) - def minShufflePartitionNum: Int = getConf(COALESCE_PARTITIONS_MIN_PARTITION_NUM) - def initialShufflePartitionNum: Int = getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index c1486aa..68da06d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -100,7 +100,7 @@ case class AdaptiveSparkPlanExec( // before 'CoalesceShufflePartitions', as the skewed partition handled // in 'OptimizeSkewedJoin' rule, should be omitted in 'CoalesceShufflePartitions'. OptimizeSkewedJoin(conf), - CoalesceShufflePartitions(conf), + CoalesceShufflePartitions(context.session), // The rule of 'OptimizeLocalShuffleReader' need to make use of the 'partitionStartIndices' // in 'CoalesceShufflePartitions' rule. So it must be after 'CoalesceShufflePartitions' rule. OptimizeLocalShuffleReader(conf), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index d779a20..d2a7f6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.MapOutputStatistics +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.internal.SQLConf @@ -26,8 +27,9 @@ import org.apache.spark.sql.internal.SQLConf * A rule to coalesce the shuffle partitions based on the map output statistics, which can * avoid many small reduce tasks that hurt performance. */ -case class CoalesceShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { +case class CoalesceShufflePartitions(session: SparkSession) extends Rule[SparkPlan] { import CoalesceShufflePartitions._ + private def conf = session.sessionState.conf override def apply(plan: SparkPlan): SparkPlan = { if (!conf.coalesceShufflePartitionsEnabled) { @@ -66,12 +68,16 @@ case class CoalesceShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { val distinctNumPreShufflePartitions = validMetrics.map(stats => stats.bytesByPartitionId.length).distinct if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { + // We fall back to Spark default parallelism if the minimum number of coalesced partitions + // is not set, so to avoid perf regressions compared to no coalescing. + val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM) + .getOrElse(session.sparkContext.defaultParallelism) val partitionSpecs = ShufflePartitionsUtil.coalescePartitions( validMetrics.toArray, firstPartitionIndex = 0, lastPartitionIndex = distinctNumPreShufflePartitions.head, advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES), - minNumPartitions = conf.minShufflePartitionNum) + minNumPartitions = minPartitionNum) // This transformation adds new nodes, so we must use `transformUp` here. val stageIds = shuffleStages.map(_.id).toSet plan.transformUp { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 7f52393..a75a3f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.commons.io.FileUtils -import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv} +import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkContext, SparkEnv} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ @@ -261,12 +261,17 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) { nonSkewPartitionIndices.map(i => CoalescedPartitionSpec(i, i + 1)) } else { + // We fall back to Spark default parallelism if the minimum number of coalesced partitions + // is not set, so to avoid perf regressions compared to no coalescing. + val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM) + .getOrElse(SparkContext.getActive.get.defaultParallelism) ShufflePartitionsUtil.coalescePartitions( Array(leftStats, rightStats), firstPartitionIndex = nonSkewPartitionIndices.head, // `lastPartitionIndex` is exclusive. lastPartitionIndex = nonSkewPartitionIndices.last + 1, - advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)) + advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES), + minNumPartitions = minPartitionNum) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index 32f5fd4..292df11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -59,7 +59,7 @@ object ShufflePartitionsUtil extends Logging { firstPartitionIndex: Int, lastPartitionIndex: Int, advisoryTargetSize: Long, - minNumPartitions: Int = 1): Seq[ShufflePartitionSpec] = { + minNumPartitions: Int): Seq[ShufflePartitionSpec] = { // If `minNumPartitions` is very large, it is possible that we need to use a value less than // `advisoryTargetSize` as the target size of a coalesced task. val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org