This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b29cb1a [SPARK-30719][SQL] do not log warning if AQE is intentionally
skipped and add a config to force apply
b29cb1a is described below
commit b29cb1a82b1a1facf1dd040025db93d998dad4cd
Author: Wenchen Fan <[email protected]>
AuthorDate: Thu Feb 6 09:16:14 2020 -0800
[SPARK-30719][SQL] do not log warning if AQE is intentionally skipped and
add a config to force apply
### What changes were proposed in this pull request?
Update `InsertAdaptiveSparkPlan` to not log warning if AQE is skipped
intentionally.
This PR also add a config to not skip AQE.
### Why are the changes needed?
It's not a warning at all if we intentionally skip AQE.
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
run `AdaptiveQueryExecSuite` locally and verify that there is no warning
logs.
Closes #27452 from cloud-fan/aqe.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
(cherry picked from commit 8ce58627ebe4f0372fba9a30d8cd4213611acd9b)
Signed-off-by: Xiao Li <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 9 +++
.../adaptive/InsertAdaptiveSparkPlan.scala | 83 ++++++++++++----------
.../adaptive/AdaptiveQueryExecSuite.scala | 9 +++
3 files changed, 65 insertions(+), 36 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index acc0922..bed8410 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -358,6 +358,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val ADAPTIVE_EXECUTION_FORCE_APPLY =
buildConf("spark.sql.adaptive.forceApply")
+ .internal()
+ .doc("Adaptive query execution is skipped when the query does not have
exchanges or " +
+ "sub-queries. By setting this config to true (together with " +
+ s"'${ADAPTIVE_EXECUTION_ENABLED.key}' enabled), Spark will force apply
adaptive query " +
+ "execution for all supported queries.")
+ .booleanConf
+ .createWithDefault(false)
+
val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled,
this enables reducing " +
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 9252827..621c063 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -40,49 +40,60 @@ case class InsertAdaptiveSparkPlan(
private val conf = adaptiveExecutionContext.session.sessionState.conf
- def containShuffle(plan: SparkPlan): Boolean = {
- plan.find {
- case _: Exchange => true
- case s: SparkPlan => !s.requiredChildDistribution.forall(_ ==
UnspecifiedDistribution)
- }.isDefined
- }
-
- def containSubQuery(plan: SparkPlan): Boolean = {
- plan.find(_.expressions.exists(_.find {
- case _: SubqueryExpression => true
- case _ => false
- }.isDefined)).isDefined
- }
-
override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)
private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan =
plan match {
+ case _ if !conf.adaptiveExecutionEnabled => plan
case _: ExecutedCommandExec => plan
- case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan)
- && (isSubquery || containShuffle(plan) || containSubQuery(plan)) =>
- try {
- // Plan sub-queries recursively and pass in the shared stage cache for
exchange reuse. Fall
- // back to non-adaptive mode if adaptive execution is supported in any
of the sub-queries.
- val subqueryMap = buildSubqueryMap(plan)
- val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
- val preprocessingRules = Seq(
- planSubqueriesRule)
- // Run pre-processing rules.
- val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan,
preprocessingRules)
- logDebug(s"Adaptive execution enabled for plan: $plan")
- AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext,
preprocessingRules, isSubquery)
- } catch {
- case SubqueryAdaptiveNotSupportedException(subquery) =>
- logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
- s"but is not supported for sub-query: $subquery.")
- plan
- }
- case _ =>
- if (conf.adaptiveExecutionEnabled) {
+ case _ if shouldApplyAQE(plan, isSubquery) =>
+ if (supportAdaptive(plan)) {
+ try {
+ // Plan sub-queries recursively and pass in the shared stage cache
for exchange reuse.
+ // Fall back to non-AQE mode if AQE is not supported in any of the
sub-queries.
+ val subqueryMap = buildSubqueryMap(plan)
+ val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
+ val preprocessingRules = Seq(
+ planSubqueriesRule)
+ // Run pre-processing rules.
+ val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan,
preprocessingRules)
+ logDebug(s"Adaptive execution enabled for plan: $plan")
+ AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext,
preprocessingRules, isSubquery)
+ } catch {
+ case SubqueryAdaptiveNotSupportedException(subquery) =>
+ logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled
" +
+ s"but is not supported for sub-query: $subquery.")
+ plan
+ }
+ } else {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for query: $plan.")
+ plan
}
- plan
+
+ case _ => plan
+ }
+
+ // AQE is only useful when the query has exchanges or sub-queries. This
method returns true if
+ // one of the following conditions is satisfied:
+ // - The config ADAPTIVE_EXECUTION_FORCE_APPLY is true.
+ // - The input query is from a sub-query. When this happens, it means
we've already decided to
+ // apply AQE for the main query and we must continue to do it.
+ // - The query contains exchanges.
+ // - The query may need to add exchanges. It's an overkill to run
`EnsureRequirements` here, so
+ // we just check `SparkPlan.requiredChildDistribution` and see if it's
possible that the
+ // the query needs to add exchanges later.
+ // - The query contains sub-query.
+ private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {
+ conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {
+ plan.find {
+ case _: Exchange => true
+ case p if !p.requiredChildDistribution.forall(_ ==
UnspecifiedDistribution) => true
+ case p => p.expressions.exists(_.find {
+ case _: SubqueryExpression => true
+ case _ => false
+ }.isDefined)
+ }.isDefined
+ }
}
private def supportAdaptive(plan: SparkPlan): Boolean = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 78a1183..96e9772 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -780,4 +780,13 @@ class AdaptiveQueryExecSuite
)
}
}
+
+ test("force apply AQE") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ val plan = sql("SELECT * FROM testData").queryExecution.executedPlan
+ assert(plan.isInstanceOf[AdaptiveSparkPlanExec])
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]