This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 29a10a4 [SPARK-28863][SQL][FOLLOWUP] Do not reuse the physical plan
29a10a4 is described below
commit 29a10a456438be1ebd71e947e82fb13b1dec34be
Author: Wenchen Fan <[email protected]>
AuthorDate: Thu Aug 20 15:23:25 2020 +0000
[SPARK-28863][SQL][FOLLOWUP] Do not reuse the physical plan
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/29469
Instead of passing the physical plan to the fallbacked v1 source directly
and skipping analysis, optimization, planning altogether, this PR proposes to
pass the optimized plan.
### Why are the changes needed?
It's a bit risky to pass the physical plan directly. When the fallbacked v1
source applies more operations to the input DataFrame, it will re-apply the
post-planning physical rules like `CollapseCodegenStages`,
`InsertAdaptiveSparkPlan`, etc., which is very tricky.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing test suite with some new tests
Closes #29489 from cloud-fan/follow.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit d378dc5f6db6fe37426728bea714f44b94a94861)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/execution/AlreadyOptimized.scala | 37 +++++++++++++
.../spark/sql/execution/AlreadyPlanned.scala | 61 ----------------------
.../spark/sql/execution/SparkStrategies.scala | 1 -
.../datasources/v2/DataSourceV2Strategy.scala | 10 ++--
.../datasources/v2/V1FallbackWriters.scala | 20 ++++---
.../datasources/v2/WriteToDataSourceV2Exec.scala | 4 ++
...nnedSuite.scala => AlreadyOptimizedSuite.scala} | 31 ++++++-----
7 files changed, 77 insertions(+), 87 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
new file mode 100644
index 0000000..e40b114
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/** Query execution that skips re-analysis and optimize. */
+class AlreadyOptimizedExecution(
+ session: SparkSession,
+ plan: LogicalPlan) extends QueryExecution(session, plan) {
+ override lazy val analyzed: LogicalPlan = plan
+ override lazy val optimizedPlan: LogicalPlan = plan
+}
+
+object AlreadyOptimized {
+ def dataFrame(sparkSession: SparkSession, optimized: LogicalPlan): DataFrame
= {
+ val qe = new AlreadyOptimizedExecution(sparkSession, optimized)
+ new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyPlanned.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyPlanned.scala
deleted file mode 100644
index 9dd956a..0000000
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyPlanned.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan,
Statistics}
-
-/**
- * A special node that allows skipping query planning all the way to physical
execution. This node
- * can be used when a query was planned to the physical level, but we had to
go back to logical plan
- * land for some reason (e.g. V1 DataSource write execution). This will allow
the metrics, and the
- * query plan to properly appear as part of the query execution.
- */
-case class AlreadyPlanned(physicalPlan: SparkPlan) extends LogicalPlan with
MultiInstanceRelation {
- override def children: Seq[LogicalPlan] = Nil
- override lazy val resolved: Boolean = true
- override val output: Seq[Attribute] = physicalPlan.output
- override def newInstance(): LogicalPlan = {
- val newAttrs = output.map(a => a.exprId -> a.newInstance())
- val attrMap = newAttrs.toMap
- val projections = physicalPlan.output.map { o =>
- Alias(o, o.name)(attrMap(o.exprId).exprId, o.qualifier,
Option(o.metadata))
- }
- AlreadyPlanned(ProjectExec(projections, physicalPlan))
- }
-}
-
-/** Query execution that skips re-analysis and planning. */
-class AlreadyPlannedExecution(
- session: SparkSession,
- plan: AlreadyPlanned) extends QueryExecution(session, plan) {
- override lazy val analyzed: LogicalPlan = plan
- override lazy val optimizedPlan: LogicalPlan = plan
- override lazy val sparkPlan: SparkPlan = plan.physicalPlan
- override lazy val executedPlan: SparkPlan = plan.physicalPlan
-}
-
-object AlreadyPlanned {
- def dataFrame(sparkSession: SparkSession, query: SparkPlan): DataFrame = {
- val qe = new AlreadyPlannedExecution(sparkSession, AlreadyPlanned(query))
- new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
- }
-}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ec6e365..f836deb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -678,7 +678,6 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case planned: AlreadyPlanned => planned.physicalPlan :: Nil
case d: DataWritingCommand => DataWritingCommandExec(d,
planLater(d.query)) :: Nil
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index f2ff006..cca80c0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -122,10 +122,10 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
- AtomicCreateTableAsSelectExec(staging, ident, parts,
planLater(query),
+ AtomicCreateTableAsSelectExec(staging, ident, parts, query,
planLater(query),
propsWithOwner, writeOptions, ifNotExists) :: Nil
case _ =>
- CreateTableAsSelectExec(catalog, ident, parts, planLater(query),
+ CreateTableAsSelectExec(catalog, ident, parts, query,
planLater(query),
propsWithOwner, writeOptions, ifNotExists) :: Nil
}
@@ -152,6 +152,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
staging,
ident,
parts,
+ query,
planLater(query),
propsWithOwner,
writeOptions,
@@ -161,6 +162,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
catalog,
ident,
parts,
+ query,
planLater(query),
propsWithOwner,
writeOptions,
@@ -170,7 +172,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
- AppendDataExecV1(v1, writeOptions.asOptions, planLater(query)) :: Nil
+ AppendDataExecV1(v1, writeOptions.asOptions, query) :: Nil
case v2 =>
AppendDataExec(v2, writeOptions.asOptions, planLater(query)) :: Nil
}
@@ -184,7 +186,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
}.toArray
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
- OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions,
planLater(query)) :: Nil
+ OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions,
query) :: Nil
case v2 =>
OverwriteByExpressionExec(v2, filters, writeOptions.asOptions,
planLater(query)) :: Nil
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
index 28c4e28..560da39 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
@@ -22,9 +22,10 @@ import java.util.UUID
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl,
SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
-import org.apache.spark.sql.execution.{AlreadyPlanned, SparkPlan}
+import org.apache.spark.sql.execution.{AlreadyOptimized, SparkPlan}
import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,7 +37,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class AppendDataExecV1(
table: SupportsWrite,
writeOptions: CaseInsensitiveStringMap,
- query: SparkPlan) extends V1FallbackWriters {
+ plan: LogicalPlan) extends V1FallbackWriters {
override protected def run(): Seq[InternalRow] = {
writeWithV1(newWriteBuilder().buildForV1Write())
@@ -58,7 +59,7 @@ case class OverwriteByExpressionExecV1(
table: SupportsWrite,
deleteWhere: Array[Filter],
writeOptions: CaseInsensitiveStringMap,
- query: SparkPlan) extends V1FallbackWriters {
+ plan: LogicalPlan) extends V1FallbackWriters {
private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
@@ -81,7 +82,7 @@ case class OverwriteByExpressionExecV1(
/** Some helper interfaces that use V2 write semantics through the V1 writer
interface. */
sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write {
override def output: Seq[Attribute] = Nil
- override final def children: Seq[SparkPlan] = Seq(query)
+ override final def children: Seq[SparkPlan] = Nil
def table: SupportsWrite
def writeOptions: CaseInsensitiveStringMap
@@ -97,9 +98,11 @@ sealed trait V1FallbackWriters extends V2CommandExec with
SupportsV1Write {
protected def newWriteBuilder(): V1WriteBuilder = {
val info = LogicalWriteInfoImpl(
queryId = UUID.randomUUID().toString,
- schema = query.schema,
+ schema = plan.schema,
options = writeOptions)
- table.newWriteBuilder(info).asV1Builder
+ val writeBuilder = table.newWriteBuilder(info)
+
+ writeBuilder.asV1Builder
}
}
@@ -107,10 +110,11 @@ sealed trait V1FallbackWriters extends V2CommandExec with
SupportsV1Write {
* A trait that allows Tables that use V1 Writer interfaces to append data.
*/
trait SupportsV1Write extends SparkPlan {
- def query: SparkPlan
+ def plan: LogicalPlan
protected def writeWithV1(relation: InsertableRelation): Seq[InternalRow] = {
- relation.insert(AlreadyPlanned.dataFrame(sqlContext.sparkSession, query),
overwrite = false)
+ // The `plan` is already optimized, we should not analyze and optimize it
again.
+ relation.insert(AlreadyOptimized.dataFrame(sqlContext.sparkSession, plan),
overwrite = false)
Nil
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index bf96132..616e18e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -62,6 +62,7 @@ case class CreateTableAsSelectExec(
catalog: TableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
+ plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
@@ -118,6 +119,7 @@ case class AtomicCreateTableAsSelectExec(
catalog: StagingTableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
+ plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
@@ -151,6 +153,7 @@ case class ReplaceTableAsSelectExec(
catalog: TableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
+ plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
@@ -216,6 +219,7 @@ case class AtomicReplaceTableAsSelectExec(
catalog: StagingTableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
+ plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyPlannedSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
similarity index 68%
rename from
sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyPlannedSuite.scala
rename to
sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
index 9152dcf..c266aa9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyPlannedSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
@@ -17,40 +17,47 @@
package org.apache.spark.sql.execution
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.execution.adaptive.EnableAdaptiveExecutionSuite
import org.apache.spark.sql.test.SharedSparkSession
-class AlreadyPlannedSuite extends SparkPlanTest with SharedSparkSession {
+class AlreadyOptimizedSuite extends QueryTest with SharedSparkSession {
import testImplicits._
test("simple execution") {
val df = spark.range(10)
- val planned = AlreadyPlanned.dataFrame(spark, df.queryExecution.sparkPlan)
+ val planned = AlreadyOptimized.dataFrame(spark,
df.queryExecution.optimizedPlan)
- checkAnswer(planned, identity, df.toDF().collect())
+ checkAnswer(planned, df.toDF().collect())
}
test("planning on top works - projection") {
val df = spark.range(10)
- val planned = AlreadyPlanned.dataFrame(spark, df.queryExecution.sparkPlan)
+ val planned = AlreadyOptimized.dataFrame(spark,
df.queryExecution.optimizedPlan)
checkAnswer(
planned.withColumn("data", 'id + 1),
- identity,
df.withColumn("data", 'id + 1).collect())
}
test("planning on top works - filter") {
val df = spark.range(10)
- val planned = AlreadyPlanned.dataFrame(spark, df.queryExecution.sparkPlan)
+ val planned = AlreadyOptimized.dataFrame(spark,
df.queryExecution.optimizedPlan)
- checkAnswer(planned.where('id < 5), identity, df.where('id <
5).toDF().collect())
+ checkAnswer(planned.where('id < 5), df.where('id < 5).toDF().collect())
+ }
+
+ test("planning on top works - aggregate") {
+ val df = spark.range(10)
+ val planned = AlreadyOptimized.dataFrame(spark,
df.queryExecution.optimizedPlan)
+
+ checkAnswer(planned.groupBy('id).count(),
df.groupBy('id).count().collect())
}
test("planning on top works - joins") {
val df = spark.range(10)
- val planned = AlreadyPlanned.dataFrame(spark, df.queryExecution.sparkPlan)
+ val planned = AlreadyOptimized.dataFrame(spark,
df.queryExecution.optimizedPlan)
val plannedLeft = planned.alias("l")
val dfLeft = df.alias("l")
@@ -59,22 +66,20 @@ class AlreadyPlannedSuite extends SparkPlanTest with
SharedSparkSession {
checkAnswer(
plannedLeft.where('id < 3).join(plannedRight, Seq("id")),
- identity,
dfLeft.where('id < 3).join(dfRight, Seq("id")).collect())
checkAnswer(
plannedLeft.where('id < 3).join(plannedRight, plannedLeft("id") ===
plannedRight("id")),
- identity,
dfLeft.where('id < 3).join(dfRight, dfLeft("id") ===
dfRight("id")).collect())
checkAnswer(
plannedLeft.join(plannedRight, Seq("id")).where('id < 3),
- identity,
dfLeft.join(dfRight, Seq("id")).where('id < 3).collect())
checkAnswer(
plannedLeft.join(plannedRight, plannedLeft("id") ===
plannedRight("id")).where($"l.id" < 3),
- identity,
dfLeft.join(dfRight, dfLeft("id") === dfRight("id")).where($"l.id" <
3).collect())
}
}
+
+class AlreadyOptimizedAQESuite extends AlreadyOptimizedSuite with
EnableAdaptiveExecutionSuite
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]