This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ca1adf7 [SPARK-30953][SQL] InsertAdaptiveSparkPlan should apply AQE
on child plan of write commands
ca1adf7 is described below
commit ca1adf76b1d1fc15fa8d03450080072062d6f672
Author: yi.wu <[email protected]>
AuthorDate: Tue Apr 14 05:18:58 2020 +0000
[SPARK-30953][SQL] InsertAdaptiveSparkPlan should apply AQE on child plan
of write commands
This PR changes `InsertAdaptiveSparkPlan` to apply AQE on the child plan of
V1/V2 write commands rather than the command itself.
Apply AQE on write commands with child plan will expose `LogicalQueryStage`
to `Analyzer` while it should hider under `AdaptiveSparkPlanExec` only to avoid
unexpected broken.
No.
Pass Jenkins.
Closes #27701 from Ngone51/skip_v2_commands.
Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 5d4f5d36a26f07327dcc0928d437dd23582ad756)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala | 5 ++++-
.../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 10 ++++++++++
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 621c063..ea586f0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.command.{DataWritingCommandExec,
ExecutedCommandExec}
+import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.internal.SQLConf
@@ -45,6 +46,8 @@ case class InsertAdaptiveSparkPlan(
private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan =
plan match {
case _ if !conf.adaptiveExecutionEnabled => plan
case _: ExecutedCommandExec => plan
+ case c: DataWritingCommandExec => c.copy(child = apply(c.child))
+ case c: V2CommandExec => c.withNewChildren(c.children.map(apply))
case _ if shouldApplyAQE(plan, isSubquery) =>
if (supportAdaptive(plan)) {
try {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index b8ac4ddc..64dd9aa 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.scheduler.{SparkListener,
SparkListenerEvent, SparkListe
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD,
SparkPlan}
import
org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
Exchange, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec,
BuildRight, SortMergeJoinExec}
import
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
@@ -795,4 +796,13 @@ class AdaptiveQueryExecSuite
}
}
}
+
+ test("SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of
write commands") {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ val plan = sql("CREATE TABLE t1 AS SELECT 1
col").queryExecution.executedPlan
+ assert(plan.isInstanceOf[DataWritingCommandExec])
+
assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec])
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]