This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ad2fa2a [SPARK-31501][SQL] AQE update UI should not cause deadlock
ad2fa2a is described below
commit ad2fa2a2d48f12aed741c45db0536ee49fed0275
Author: Maryann Xue <[email protected]>
AuthorDate: Tue Apr 21 03:56:42 2020 +0000
[SPARK-31501][SQL] AQE update UI should not cause deadlock
### What changes were proposed in this pull request?
This PR makes sure that AQE does not call update UI if the current
execution ID does not match the current query. This PR also includes a minor
refactoring that moves `getOrCloneSessionWithAqeOff` from `QueryExecution` to
`AdaptiveSparkPlanHelper` since that function is not used by `QueryExecution`
any more.
### Why are the changes needed?
Without this fix, there could be a potential deadlock.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added UT.
Closes #28275 from maryannxue/aqe-ui-deadlock.
Authored-by: Maryann Xue <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit ae29cf24fc6d27335be411da53bc18a867bf098c)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/execution/CacheManager.scala | 7 +++--
.../spark/sql/execution/QueryExecution.scala | 36 ++++++----------------
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 11 ++++---
.../adaptive/AdaptiveSparkPlanHelper.scala | 18 ++++++++++-
.../adaptive/AdaptiveQueryExecSuite.scala | 30 +++++++++++++++++-
5 files changed, 66 insertions(+), 36 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index ad33ce5..52cec8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute,
SubqueryExpression}
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData,
LogicalPlan, ResolvedHint}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{FileIndex,
HadoopFsRelation, LogicalRelation}
@@ -45,7 +46,7 @@ case class CachedData(plan: LogicalPlan,
cachedRepresentation: InMemoryRelation)
*
* Internal to Spark SQL.
*/
-class CacheManager extends Logging {
+class CacheManager extends Logging with AdaptiveSparkPlanHelper {
/**
* Maintains the list of cached plans as an immutable sequence. Any updates
to the list
@@ -80,7 +81,7 @@ class CacheManager extends Logging {
logWarning("Asked to cache already cached data.")
} else {
// Turn off AQE so that the outputPartitioning of the underlying plan
can be leveraged.
- val sessionWithAqeOff =
QueryExecution.getOrCloneSessionWithAqeOff(query.sparkSession)
+ val sessionWithAqeOff = getOrCloneSessionWithAqeOff(query.sparkSession)
val inMemoryRelation = sessionWithAqeOff.withActive {
val qe = sessionWithAqeOff.sessionState.executePlan(planToCache)
InMemoryRelation(
@@ -191,7 +192,7 @@ class CacheManager extends Logging {
needToRecache.map { cd =>
cd.cachedRepresentation.cacheBuilder.clearCache()
// Turn off AQE so that the outputPartitioning of the underlying plan
can be leveraged.
- val sessionWithAqeOff = QueryExecution.getOrCloneSessionWithAqeOff(spark)
+ val sessionWithAqeOff = getOrCloneSessionWithAqeOff(spark)
val newCache = sessionWithAqeOff.withActive {
val qe = sessionWithAqeOff.sessionState.executePlan(cd.plan)
InMemoryRelation(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 20bc289..99bc45f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -112,7 +112,8 @@ class QueryExecution(
def observedMetrics: Map[String, Row] =
CollectMetricsExec.collect(executedPlan)
protected def preparations: Seq[Rule[SparkPlan]] = {
- QueryExecution.preparations(sparkSession)
+ QueryExecution.preparations(sparkSession,
+ Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession,
this))))
}
private def executePhase[T](phase: String)(block: => T): T =
sparkSession.withActive {
@@ -274,18 +275,15 @@ object QueryExecution {
* are correct, insert whole stage code gen, and try to reduce the work done
by reusing exchanges
* and subqueries.
*/
- private[execution] def preparations(sparkSession: SparkSession):
Seq[Rule[SparkPlan]] = {
-
- val sparkSessionWithAqeOff = getOrCloneSessionWithAqeOff(sparkSession)
-
+ private[execution] def preparations(
+ sparkSession: SparkSession,
+ adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None):
Seq[Rule[SparkPlan]] = {
+ // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following
rules will be no-op
+ // as the original plan is hidden behind `AdaptiveSparkPlanExec`.
+ adaptiveExecutionRule.toSeq ++
Seq(
- // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the
following rules will be no-op
- // as the original plan is hidden behind `AdaptiveSparkPlanExec`.
- InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession)),
- // If the following rules apply, it means the main query is not AQE-ed,
so we make sure the
- // subqueries are not AQE-ed either.
- PlanDynamicPruningFilters(sparkSessionWithAqeOff),
- PlanSubqueries(sparkSessionWithAqeOff),
+ PlanDynamicPruningFilters(sparkSession),
+ PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
@@ -334,18 +332,4 @@ object QueryExecution {
val sparkPlan = createSparkPlan(spark, spark.sessionState.planner,
plan.clone())
prepareExecutedPlan(spark, sparkPlan)
}
-
- /**
- * Returns a cloned [[SparkSession]] with adaptive execution disabled, or
the original
- * [[SparkSession]] if its adaptive execution is already disabled.
- */
- def getOrCloneSessionWithAqeOff[T](session: SparkSession): SparkSession = {
- if (!session.sessionState.conf.adaptiveExecutionEnabled) {
- session
- } else {
- val newSession = session.cloneSession()
- newSession.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED,
false)
- newSession
- }
- }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 0ec8b5f..d73540c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -145,10 +145,11 @@ case class AdaptiveSparkPlanExec(
// `plan.queryExecution.rdd`, we need to set active session here as new
plan nodes can be
// created in the middle of the execution.
context.session.withActive {
- // Subqueries do not have their own execution IDs and therefore rely on
the main query to
- // update UI.
- val executionId = Option(context.session.sparkContext.getLocalProperty(
- SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
+ // If the `QueryExecution` does not match the current execution ID, it
means the execution ID
+ // belongs to another (parent) query, and we should not call update UI
in this query.
+ val executionId =
+
Option(context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY))
+ .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq
context.qe)
var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
var result = createQueryStages(currentPhysicalPlan)
val events = new LinkedBlockingQueue[StageMaterializationEvent]()
@@ -572,7 +573,7 @@ object AdaptiveSparkPlanExec {
/**
* The execution context shared between the main query and all sub-queries.
*/
-case class AdaptiveExecutionContext(session: SparkSession) {
+case class AdaptiveExecutionContext(session: SparkSession, qe: QueryExecution)
{
/**
* The subquery-reuse map shared across the entire query.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala
index cd87230..3cf6a13 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala
@@ -17,7 +17,9 @@
package org.apache.spark.sql.execution.adaptive
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.internal.SQLConf
/**
* This class provides utility methods related to tree traversal of an
[[AdaptiveSparkPlanExec]]
@@ -135,4 +137,18 @@ trait AdaptiveSparkPlanHelper {
case a: AdaptiveSparkPlanExec => a.executedPlan
case other => other
}
- }
+
+ /**
+ * Returns a cloned [[SparkSession]] with adaptive execution disabled, or
the original
+ * [[SparkSession]] if its adaptive execution is already disabled.
+ */
+ def getOrCloneSessionWithAqeOff[T](session: SparkSession): SparkSession = {
+ if (!session.sessionState.conf.adaptiveExecutionEnabled) {
+ session
+ } else {
+ val newSession = session.cloneSession()
+ newSession.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED,
false)
+ newSession
+ }
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 6da510f..8225863 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -23,7 +23,8 @@ import java.net.URI
import org.apache.log4j.Level
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent,
SparkListenerJobStart}
-import org.apache.spark.sql.{QueryTest, Row, SparkSession}
+import org.apache.spark.sql.{QueryTest, Row, SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD,
SparkPlan}
import
org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION
import org.apache.spark.sql.execution.command.DataWritingCommandExec
@@ -816,4 +817,31 @@ class AdaptiveQueryExecSuite
SparkSession.setActiveSession(spark) // recover the active session.
}
}
+
+ test("No deadlock in UI update") {
+ object TestStrategy extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case _: Aggregate =>
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ spark.range(5).rdd
+ }
+ Nil
+ case _ => Nil
+ }
+ }
+
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ try {
+ spark.experimental.extraStrategies = TestStrategy :: Nil
+ val df = spark.range(10).groupBy('id).count()
+ df.collect()
+ } finally {
+ spark.experimental.extraStrategies = Nil
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]