This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 77c49cb  [SPARK-31124][SQL] change the default value of 
minPartitionNum in AQE
77c49cb is described below

commit 77c49cb702862a0c60733dba797201ada2f5b51a
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Thu Mar 12 21:28:24 2020 +0800

    [SPARK-31124][SQL] change the default value of minPartitionNum in AQE
    
    ### What changes were proposed in this pull request?
    
    AQE has a perf regression when using the default settings: if we coalesce 
the shuffle partitions into one or few partitions, we may leave many CPU cores 
idle and the perf is worse than with AQE off (which leverages all CPU cores).
    
    Technically, this is not a bad thing. If there are many queries running at 
the same time, it's better to coalesce shuffle partitions into fewer 
partitions. However, the default settings of AQE should try to avoid any perf 
regression as possible as we can.
    
    This PR changes the default value of minPartitionNum when coalescing 
shuffle partitions, to be `SparkContext.defaultParallelism`, so that AQE can 
leverage all the CPU cores.
    
    ### Why are the changes needed?
    
    avoid AQE perf regression
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #27879 from cloud-fan/aqe.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala |  7 +++----
 .../spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala   |  2 +-
 .../sql/execution/adaptive/CoalesceShufflePartitions.scala     | 10 ++++++++--
 .../spark/sql/execution/adaptive/OptimizeSkewedJoin.scala      |  9 +++++++--
 .../spark/sql/execution/adaptive/ShufflePartitionsUtil.scala   |  2 +-
 5 files changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7780585..b5f2046 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -434,13 +434,14 @@ object SQLConf {
 
   val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
     buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
-      .doc("The minimum number of shuffle partitions after coalescing. This 
configuration only " +
+      .doc("The minimum number of shuffle partitions after coalescing. If not 
set, the default " +
+        "value is the default parallelism of the Spark cluster. This 
configuration only " +
         s"has an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
         s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
       .version("3.0.0")
       .intConf
       .checkValue(_ > 0, "The minimum number of partitions must be positive.")
-      .createWithDefault(1)
+      .createOptional
 
   val COALESCE_PARTITIONS_INITIAL_PARTITION_NUM =
     buildConf("spark.sql.adaptive.coalescePartitions.initialPartitionNum")
@@ -2703,8 +2704,6 @@ class SQLConf extends Serializable with Logging {
 
   def coalesceShufflePartitionsEnabled: Boolean = 
getConf(COALESCE_PARTITIONS_ENABLED)
 
-  def minShufflePartitionNum: Int = 
getConf(COALESCE_PARTITIONS_MIN_PARTITION_NUM)
-
   def initialShufflePartitionNum: Int =
     
getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index c1486aa..68da06d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -100,7 +100,7 @@ case class AdaptiveSparkPlanExec(
     // before 'CoalesceShufflePartitions', as the skewed partition handled
     // in 'OptimizeSkewedJoin' rule, should be omitted in 
'CoalesceShufflePartitions'.
     OptimizeSkewedJoin(conf),
-    CoalesceShufflePartitions(conf),
+    CoalesceShufflePartitions(context.session),
     // The rule of 'OptimizeLocalShuffleReader' need to make use of the 
'partitionStartIndices'
     // in 'CoalesceShufflePartitions' rule. So it must be after 
'CoalesceShufflePartitions' rule.
     OptimizeLocalShuffleReader(conf),
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index d779a20..d2a7f6a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.adaptive
 
 import org.apache.spark.MapOutputStatistics
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.internal.SQLConf
@@ -26,8 +27,9 @@ import org.apache.spark.sql.internal.SQLConf
  * A rule to coalesce the shuffle partitions based on the map output 
statistics, which can
  * avoid many small reduce tasks that hurt performance.
  */
-case class CoalesceShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
+case class CoalesceShufflePartitions(session: SparkSession) extends 
Rule[SparkPlan] {
   import CoalesceShufflePartitions._
+  private def conf = session.sessionState.conf
 
   override def apply(plan: SparkPlan): SparkPlan = {
     if (!conf.coalesceShufflePartitionsEnabled) {
@@ -66,12 +68,16 @@ case class CoalesceShufflePartitions(conf: SQLConf) extends 
Rule[SparkPlan] {
       val distinctNumPreShufflePartitions =
         validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
       if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 
1) {
+        // We fall back to Spark default parallelism if the minimum number of 
coalesced partitions
+        // is not set, so to avoid perf regressions compared to no coalescing.
+        val minPartitionNum = 
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
+          .getOrElse(session.sparkContext.defaultParallelism)
         val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(
           validMetrics.toArray,
           firstPartitionIndex = 0,
           lastPartitionIndex = distinctNumPreShufflePartitions.head,
           advisoryTargetSize = 
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
-          minNumPartitions = conf.minShufflePartitionNum)
+          minNumPartitions = minPartitionNum)
         // This transformation adds new nodes, so we must use `transformUp` 
here.
         val stageIds = shuffleStages.map(_.id).toSet
         plan.transformUp {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index 7f52393..a75a3f3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
 
 import org.apache.commons.io.FileUtils
 
-import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, 
SparkContext, SparkEnv}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
@@ -261,12 +261,17 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
     if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) {
       nonSkewPartitionIndices.map(i => CoalescedPartitionSpec(i, i + 1))
     } else {
+      // We fall back to Spark default parallelism if the minimum number of 
coalesced partitions
+      // is not set, so to avoid perf regressions compared to no coalescing.
+      val minPartitionNum = 
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
+        .getOrElse(SparkContext.getActive.get.defaultParallelism)
       ShufflePartitionsUtil.coalescePartitions(
         Array(leftStats, rightStats),
         firstPartitionIndex = nonSkewPartitionIndices.head,
         // `lastPartitionIndex` is exclusive.
         lastPartitionIndex = nonSkewPartitionIndices.last + 1,
-        advisoryTargetSize = 
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES))
+        advisoryTargetSize = 
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
+        minNumPartitions = minPartitionNum)
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 32f5fd4..292df11 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -59,7 +59,7 @@ object ShufflePartitionsUtil extends Logging {
       firstPartitionIndex: Int,
       lastPartitionIndex: Int,
       advisoryTargetSize: Long,
-      minNumPartitions: Int = 1): Seq[ShufflePartitionSpec] = {
+      minNumPartitions: Int): Seq[ShufflePartitionSpec] = {
     // If `minNumPartitions` is very large, it is possible that we need to use 
a value less than
     // `advisoryTargetSize` as the target size of a coalesced task.
     val totalPostShuffleInputSize = 
mapOutputStatistics.map(_.bytesByPartitionId.sum).sum


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to