This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3b94aad [SPARK-35552][SQL] Make query stage materialized more readable
3b94aad is described below
commit 3b94aad5e72a6b96e4a8f517ac60e0a2fed2590b
Author: ulysses-you <[email protected]>
AuthorDate: Fri May 28 20:42:11 2021 +0800
[SPARK-35552][SQL] Make query stage materialized more readable
### What changes were proposed in this pull request?
Add a new method `isMaterialized` in `QueryStageExec`.
### Why are the changes needed?
Currently, we use `resultOption().get.isDefined` to check if a query stage
has materialized. The code is not readable at a glance. It's better to use a
new method like `isMaterialized` to define it.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass CI.
Closes #32689 from ulysses-you/SPARK-35552.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala | 5 ++---
.../spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 6 +++---
.../apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala | 2 +-
.../org/apache/spark/sql/execution/adaptive/QueryStageExec.scala | 7 +++++--
4 files changed, 11 insertions(+), 9 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
index 614fc78..648d2e7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
@@ -37,14 +37,13 @@ object AQEPropagateEmptyRelation extends
PropagateEmptyRelationBase {
super.nonEmpty(plan) || getRowCount(plan).exists(_ > 0)
private def getRowCount(plan: LogicalPlan): Option[BigInt] = plan match {
- case LogicalQueryStage(_, stage: QueryStageExec) if
stage.resultOption.get().isDefined =>
+ case LogicalQueryStage(_, stage: QueryStageExec) if stage.isMaterialized =>
stage.getRuntimeStatistics.rowCount
case _ => None
}
private def isRelationWithAllNullKeys(plan: LogicalPlan): Boolean = plan
match {
- case LogicalQueryStage(_, stage: BroadcastQueryStageExec)
- if stage.resultOption.get().isDefined =>
+ case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if
stage.isMaterialized =>
stage.broadcast.relationFuture.get().value ==
HashedRelationWithAllNullKeys
case _ => false
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 556c036..ebff790 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -420,7 +420,7 @@ case class AdaptiveSparkPlanExec(
context.stageCache.get(e.canonicalized) match {
case Some(existingStage) if conf.exchangeReuseEnabled =>
val stage = reuseQueryStage(existingStage, e)
- val isMaterialized = stage.resultOption.get().isDefined
+ val isMaterialized = stage.isMaterialized
CreateStageResult(
newPlan = stage,
allChildStagesMaterialized = isMaterialized,
@@ -442,7 +442,7 @@ case class AdaptiveSparkPlanExec(
newStage = reuseQueryStage(queryStage, e)
}
}
- val isMaterialized = newStage.resultOption.get().isDefined
+ val isMaterialized = newStage.isMaterialized
CreateStageResult(
newPlan = newStage,
allChildStagesMaterialized = isMaterialized,
@@ -455,7 +455,7 @@ case class AdaptiveSparkPlanExec(
case q: QueryStageExec =>
CreateStageResult(newPlan = q,
- allChildStagesMaterialized = q.resultOption.get().isDefined, newStages
= Seq.empty)
+ allChildStagesMaterialized = q.isMaterialized, newStages = Seq.empty)
case _ =>
if (plan.children.isEmpty) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
index 61124f0..a8c74b5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
@@ -53,7 +53,7 @@ object DynamicJoinSelection extends Rule[LogicalPlan] {
}
private def selectJoinStrategy(plan: LogicalPlan): Option[JoinStrategyHint]
= plan match {
- case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if
stage.resultOption.get().isDefined
+ case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if
stage.isMaterialized
&& stage.mapStats.isDefined =>
val demoteBroadcastHash =
shouldDemoteBroadcastHashJoin(stage.mapStats.get)
val preferShuffleHash = preferShuffledHashJoin(stage.mapStats.get)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index a4ec4f1..6451d0b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -95,11 +95,13 @@ abstract class QueryStageExec extends LeafExecNode {
/**
* Compute the statistics of the query stage if executed, otherwise None.
*/
- def computeStats(): Option[Statistics] = resultOption.get().map { _ =>
+ def computeStats(): Option[Statistics] = if (isMaterialized) {
val runtimeStats = getRuntimeStatistics
val dataSize = runtimeStats.sizeInBytes.max(0)
val numOutputRows = runtimeStats.rowCount.map(_.max(0))
- Statistics(dataSize, numOutputRows, isRuntime = true)
+ Some(Statistics(dataSize, numOutputRows, isRuntime = true))
+ } else {
+ None
}
@transient
@@ -107,6 +109,7 @@ abstract class QueryStageExec extends LeafExecNode {
protected var _resultOption = new AtomicReference[Option[Any]](None)
private[adaptive] def resultOption: AtomicReference[Option[Any]] =
_resultOption
+ def isMaterialized: Boolean = resultOption.get().isDefined
override def output: Seq[Attribute] = plan.output
override def outputPartitioning: Partitioning = plan.outputPartitioning
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]