This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 64b3b56 [SPARK-32083][SQL][3.0] AQE should not coalesce partitions
for SinglePartition
64b3b56 is described below
commit 64b3b56698dcb8446dd2801ce943938177d3fbd5
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Aug 3 12:56:23 2020 +0000
[SPARK-32083][SQL][3.0] AQE should not coalesce partitions for
SinglePartition
This is a partial backport of https://github.com/apache/spark/pull/29307
Most of the changes are not needed because
https://github.com/apache/spark/pull/28226 is in master only.
This PR only backports the safeguard in
`ShuffleExchangeExec.canChangeNumPartitions`
Closes #29321 from cloud-fan/aqe.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++--
.../spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala | 6 ++++--
.../apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala | 7 ++++++-
3 files changed, 12 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 28d1ccb..f836deb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -752,7 +752,7 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
case logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
ShuffleExchangeExec(RoundRobinPartitioning(numPartitions),
- planLater(child), canChangeNumPartitions = false) :: Nil
+ planLater(child), noUserSpecifiedNumPartition = false) :: Nil
} else {
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
}
@@ -786,7 +786,7 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
execution.RangeExec(r) :: Nil
case r: logical.RepartitionByExpression =>
exchange.ShuffleExchangeExec(
- r.partitioning, planLater(r.child), canChangeNumPartitions = false)
:: Nil
+ r.partitioning, planLater(r.child), noUserSpecifiedNumPartition =
false) :: Nil
case ExternalRDD(outputObjAttr, rdd) =>
ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning,
r.outputOrdering) :: Nil
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
index 6684376..31d1f34 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
@@ -141,8 +141,10 @@ object OptimizeLocalShuffleReader {
}
def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
- case s: ShuffleQueryStageExec => s.shuffle.canChangeNumPartitions
- case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _, _) =>
s.shuffle.canChangeNumPartitions
+ case s: ShuffleQueryStageExec =>
+ s.shuffle.canChangeNumPartitions && s.mapStats.isDefined
+ case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _, _) =>
+ s.shuffle.canChangeNumPartitions && s.mapStats.isDefined
case _ => false
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index b7da78c..24c7369 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -83,7 +83,12 @@ trait ShuffleExchangeLike extends Exchange {
case class ShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan,
- canChangeNumPartitions: Boolean = true) extends ShuffleExchangeLike {
+ noUserSpecifiedNumPartition: Boolean = true) extends ShuffleExchangeLike {
+
+ // If users specify the num partitions via APIs like `repartition`, we
shouldn't change it.
+ // For `SinglePartition`, it requires exactly one partition and we can't
change it either.
+ def canChangeNumPartitions: Boolean =
+ noUserSpecifiedNumPartition && outputPartitioning != SinglePartition
private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]