This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 64b3b56  [SPARK-32083][SQL][3.0] AQE should not coalesce partitions 
for SinglePartition
64b3b56 is described below

commit 64b3b56698dcb8446dd2801ce943938177d3fbd5
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Aug 3 12:56:23 2020 +0000

    [SPARK-32083][SQL][3.0] AQE should not coalesce partitions for 
SinglePartition
    
    This is a partial backport of https://github.com/apache/spark/pull/29307
    
    Most of the changes are not needed because 
https://github.com/apache/spark/pull/28226 is in master only.
    
    This PR only backports the safeguard in 
`ShuffleExchangeExec.canChangeNumPartitions`
    
    Closes #29321 from cloud-fan/aqe.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../scala/org/apache/spark/sql/execution/SparkStrategies.scala     | 4 ++--
 .../spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala  | 6 ++++--
 .../apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala  | 7 ++++++-
 3 files changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 28d1ccb..f836deb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -752,7 +752,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case logical.Repartition(numPartitions, shuffle, child) =>
         if (shuffle) {
           ShuffleExchangeExec(RoundRobinPartitioning(numPartitions),
-            planLater(child), canChangeNumPartitions = false) :: Nil
+            planLater(child), noUserSpecifiedNumPartition = false) :: Nil
         } else {
           execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
         }
@@ -786,7 +786,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         execution.RangeExec(r) :: Nil
       case r: logical.RepartitionByExpression =>
         exchange.ShuffleExchangeExec(
-          r.partitioning, planLater(r.child), canChangeNumPartitions = false) 
:: Nil
+          r.partitioning, planLater(r.child), noUserSpecifiedNumPartition = 
false) :: Nil
       case ExternalRDD(outputObjAttr, rdd) => 
ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
       case r: LogicalRDD =>
         RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, 
r.outputOrdering) :: Nil
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
index 6684376..31d1f34 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
@@ -141,8 +141,10 @@ object OptimizeLocalShuffleReader {
   }
 
   def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
-    case s: ShuffleQueryStageExec => s.shuffle.canChangeNumPartitions
-    case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _, _) => 
s.shuffle.canChangeNumPartitions
+    case s: ShuffleQueryStageExec =>
+      s.shuffle.canChangeNumPartitions && s.mapStats.isDefined
+    case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _, _) =>
+      s.shuffle.canChangeNumPartitions && s.mapStats.isDefined
     case _ => false
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index b7da78c..24c7369 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -83,7 +83,12 @@ trait ShuffleExchangeLike extends Exchange {
 case class ShuffleExchangeExec(
     override val outputPartitioning: Partitioning,
     child: SparkPlan,
-    canChangeNumPartitions: Boolean = true) extends ShuffleExchangeLike {
+    noUserSpecifiedNumPartition: Boolean = true) extends ShuffleExchangeLike {
+
+  // If users specify the num partitions via APIs like `repartition`, we 
shouldn't change it.
+  // For `SinglePartition`, it requires exactly one partition and we can't 
change it either.
+  def canChangeNumPartitions: Boolean =
+    noUserSpecifiedNumPartition && outputPartitioning != SinglePartition
 
   private lazy val writeMetrics =
     SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to