This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 29a10a4  [SPARK-28863][SQL][FOLLOWUP] Do not reuse the physical plan
29a10a4 is described below

commit 29a10a456438be1ebd71e947e82fb13b1dec34be
Author: Wenchen Fan <[email protected]>
AuthorDate: Thu Aug 20 15:23:25 2020 +0000

    [SPARK-28863][SQL][FOLLOWUP] Do not reuse the physical plan
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/29469
    
    Instead of passing the physical plan to the fallbacked v1 source directly 
and skipping analysis, optimization, planning altogether, this PR proposes to 
pass the optimized plan.
    
    ### Why are the changes needed?
    
    It's a bit risky to pass the physical plan directly. When the fallbacked v1 
source applies more operations to the input DataFrame, it will re-apply the 
post-planning physical rules like `CollapseCodegenStages`, 
`InsertAdaptiveSparkPlan`, etc., which is very tricky.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    existing test suite with some new tests
    
    Closes #29489 from cloud-fan/follow.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit d378dc5f6db6fe37426728bea714f44b94a94861)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/execution/AlreadyOptimized.scala     | 37 +++++++++++++
 .../spark/sql/execution/AlreadyPlanned.scala       | 61 ----------------------
 .../spark/sql/execution/SparkStrategies.scala      |  1 -
 .../datasources/v2/DataSourceV2Strategy.scala      | 10 ++--
 .../datasources/v2/V1FallbackWriters.scala         | 20 ++++---
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |  4 ++
 ...nnedSuite.scala => AlreadyOptimizedSuite.scala} | 31 ++++++-----
 7 files changed, 77 insertions(+), 87 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
new file mode 100644
index 0000000..e40b114
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/** Query execution that skips re-analysis and optimize. */
+class AlreadyOptimizedExecution(
+    session: SparkSession,
+    plan: LogicalPlan) extends QueryExecution(session, plan) {
+  override lazy val analyzed: LogicalPlan = plan
+  override lazy val optimizedPlan: LogicalPlan = plan
+}
+
+object AlreadyOptimized {
+  def dataFrame(sparkSession: SparkSession, optimized: LogicalPlan): DataFrame 
= {
+    val qe = new AlreadyOptimizedExecution(sparkSession, optimized)
+    new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyPlanned.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyPlanned.scala
deleted file mode 100644
index 9dd956a..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyPlanned.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
-
-/**
- * A special node that allows skipping query planning all the way to physical 
execution. This node
- * can be used when a query was planned to the physical level, but we had to 
go back to logical plan
- * land for some reason (e.g. V1 DataSource write execution). This will allow 
the metrics, and the
- * query plan to properly appear as part of the query execution.
- */
-case class AlreadyPlanned(physicalPlan: SparkPlan) extends LogicalPlan with 
MultiInstanceRelation {
-  override def children: Seq[LogicalPlan] = Nil
-  override lazy val resolved: Boolean = true
-  override val output: Seq[Attribute] = physicalPlan.output
-  override def newInstance(): LogicalPlan = {
-    val newAttrs = output.map(a => a.exprId -> a.newInstance())
-    val attrMap = newAttrs.toMap
-    val projections = physicalPlan.output.map { o =>
-      Alias(o, o.name)(attrMap(o.exprId).exprId, o.qualifier, 
Option(o.metadata))
-    }
-    AlreadyPlanned(ProjectExec(projections, physicalPlan))
-  }
-}
-
-/** Query execution that skips re-analysis and planning. */
-class AlreadyPlannedExecution(
-    session: SparkSession,
-    plan: AlreadyPlanned) extends QueryExecution(session, plan) {
-  override lazy val analyzed: LogicalPlan = plan
-  override lazy val optimizedPlan: LogicalPlan = plan
-  override lazy val sparkPlan: SparkPlan = plan.physicalPlan
-  override lazy val executedPlan: SparkPlan = plan.physicalPlan
-}
-
-object AlreadyPlanned {
-  def dataFrame(sparkSession: SparkSession, query: SparkPlan): DataFrame = {
-    val qe = new AlreadyPlannedExecution(sparkSession, AlreadyPlanned(query))
-    new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
-  }
-}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ec6e365..f836deb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -678,7 +678,6 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   object BasicOperators extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case planned: AlreadyPlanned => planned.physicalPlan :: Nil
       case d: DataWritingCommand => DataWritingCommandExec(d, 
planLater(d.query)) :: Nil
       case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index f2ff006..cca80c0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -122,10 +122,10 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       val writeOptions = new CaseInsensitiveStringMap(options.asJava)
       catalog match {
         case staging: StagingTableCatalog =>
-          AtomicCreateTableAsSelectExec(staging, ident, parts, 
planLater(query),
+          AtomicCreateTableAsSelectExec(staging, ident, parts, query, 
planLater(query),
             propsWithOwner, writeOptions, ifNotExists) :: Nil
         case _ =>
-          CreateTableAsSelectExec(catalog, ident, parts, planLater(query),
+          CreateTableAsSelectExec(catalog, ident, parts, query, 
planLater(query),
             propsWithOwner, writeOptions, ifNotExists) :: Nil
       }
 
@@ -152,6 +152,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
             staging,
             ident,
             parts,
+            query,
             planLater(query),
             propsWithOwner,
             writeOptions,
@@ -161,6 +162,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
             catalog,
             ident,
             parts,
+            query,
             planLater(query),
             propsWithOwner,
             writeOptions,
@@ -170,7 +172,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
     case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
       r.table.asWritable match {
         case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
-          AppendDataExecV1(v1, writeOptions.asOptions, planLater(query)) :: Nil
+          AppendDataExecV1(v1, writeOptions.asOptions, query) :: Nil
         case v2 =>
           AppendDataExec(v2, writeOptions.asOptions, planLater(query)) :: Nil
       }
@@ -184,7 +186,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       }.toArray
       r.table.asWritable match {
         case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
-          OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, 
planLater(query)) :: Nil
+          OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, 
query) :: Nil
         case v2 =>
           OverwriteByExpressionExec(v2, filters, writeOptions.asOptions, 
planLater(query)) :: Nil
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
index 28c4e28..560da39 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
@@ -22,9 +22,10 @@ import java.util.UUID
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.catalog.SupportsWrite
 import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, 
SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
-import org.apache.spark.sql.execution.{AlreadyPlanned, SparkPlan}
+import org.apache.spark.sql.execution.{AlreadyOptimized, SparkPlan}
 import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -36,7 +37,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 case class AppendDataExecV1(
     table: SupportsWrite,
     writeOptions: CaseInsensitiveStringMap,
-    query: SparkPlan) extends V1FallbackWriters {
+    plan: LogicalPlan) extends V1FallbackWriters {
 
   override protected def run(): Seq[InternalRow] = {
     writeWithV1(newWriteBuilder().buildForV1Write())
@@ -58,7 +59,7 @@ case class OverwriteByExpressionExecV1(
     table: SupportsWrite,
     deleteWhere: Array[Filter],
     writeOptions: CaseInsensitiveStringMap,
-    query: SparkPlan) extends V1FallbackWriters {
+    plan: LogicalPlan) extends V1FallbackWriters {
 
   private def isTruncate(filters: Array[Filter]): Boolean = {
     filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
@@ -81,7 +82,7 @@ case class OverwriteByExpressionExecV1(
 /** Some helper interfaces that use V2 write semantics through the V1 writer 
interface. */
 sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write {
   override def output: Seq[Attribute] = Nil
-  override final def children: Seq[SparkPlan] = Seq(query)
+  override final def children: Seq[SparkPlan] = Nil
 
   def table: SupportsWrite
   def writeOptions: CaseInsensitiveStringMap
@@ -97,9 +98,11 @@ sealed trait V1FallbackWriters extends V2CommandExec with 
SupportsV1Write {
   protected def newWriteBuilder(): V1WriteBuilder = {
     val info = LogicalWriteInfoImpl(
       queryId = UUID.randomUUID().toString,
-      schema = query.schema,
+      schema = plan.schema,
       options = writeOptions)
-    table.newWriteBuilder(info).asV1Builder
+    val writeBuilder = table.newWriteBuilder(info)
+
+    writeBuilder.asV1Builder
   }
 }
 
@@ -107,10 +110,11 @@ sealed trait V1FallbackWriters extends V2CommandExec with 
SupportsV1Write {
  * A trait that allows Tables that use V1 Writer interfaces to append data.
  */
 trait SupportsV1Write extends SparkPlan {
-  def query: SparkPlan
+  def plan: LogicalPlan
 
   protected def writeWithV1(relation: InsertableRelation): Seq[InternalRow] = {
-    relation.insert(AlreadyPlanned.dataFrame(sqlContext.sparkSession, query), 
overwrite = false)
+    // The `plan` is already optimized, we should not analyze and optimize it 
again.
+    relation.insert(AlreadyOptimized.dataFrame(sqlContext.sparkSession, plan), 
overwrite = false)
     Nil
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index bf96132..616e18e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -62,6 +62,7 @@ case class CreateTableAsSelectExec(
     catalog: TableCatalog,
     ident: Identifier,
     partitioning: Seq[Transform],
+    plan: LogicalPlan,
     query: SparkPlan,
     properties: Map[String, String],
     writeOptions: CaseInsensitiveStringMap,
@@ -118,6 +119,7 @@ case class AtomicCreateTableAsSelectExec(
     catalog: StagingTableCatalog,
     ident: Identifier,
     partitioning: Seq[Transform],
+    plan: LogicalPlan,
     query: SparkPlan,
     properties: Map[String, String],
     writeOptions: CaseInsensitiveStringMap,
@@ -151,6 +153,7 @@ case class ReplaceTableAsSelectExec(
     catalog: TableCatalog,
     ident: Identifier,
     partitioning: Seq[Transform],
+    plan: LogicalPlan,
     query: SparkPlan,
     properties: Map[String, String],
     writeOptions: CaseInsensitiveStringMap,
@@ -216,6 +219,7 @@ case class AtomicReplaceTableAsSelectExec(
     catalog: StagingTableCatalog,
     ident: Identifier,
     partitioning: Seq[Transform],
+    plan: LogicalPlan,
     query: SparkPlan,
     properties: Map[String, String],
     writeOptions: CaseInsensitiveStringMap,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyPlannedSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
similarity index 68%
rename from 
sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyPlannedSuite.scala
rename to 
sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
index 9152dcf..c266aa9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyPlannedSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
@@ -17,40 +17,47 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.execution.adaptive.EnableAdaptiveExecutionSuite
 import org.apache.spark.sql.test.SharedSparkSession
 
-class AlreadyPlannedSuite extends SparkPlanTest with SharedSparkSession {
+class AlreadyOptimizedSuite extends QueryTest with SharedSparkSession {
 
   import testImplicits._
 
   test("simple execution") {
     val df = spark.range(10)
-    val planned = AlreadyPlanned.dataFrame(spark, df.queryExecution.sparkPlan)
+    val planned = AlreadyOptimized.dataFrame(spark, 
df.queryExecution.optimizedPlan)
 
-    checkAnswer(planned, identity, df.toDF().collect())
+    checkAnswer(planned, df.toDF().collect())
   }
 
   test("planning on top works - projection") {
     val df = spark.range(10)
-    val planned = AlreadyPlanned.dataFrame(spark, df.queryExecution.sparkPlan)
+    val planned = AlreadyOptimized.dataFrame(spark, 
df.queryExecution.optimizedPlan)
 
     checkAnswer(
       planned.withColumn("data", 'id + 1),
-      identity,
       df.withColumn("data", 'id + 1).collect())
   }
 
   test("planning on top works - filter") {
     val df = spark.range(10)
-    val planned = AlreadyPlanned.dataFrame(spark, df.queryExecution.sparkPlan)
+    val planned = AlreadyOptimized.dataFrame(spark, 
df.queryExecution.optimizedPlan)
 
-    checkAnswer(planned.where('id < 5), identity, df.where('id < 
5).toDF().collect())
+    checkAnswer(planned.where('id < 5), df.where('id < 5).toDF().collect())
+  }
+
+  test("planning on top works - aggregate") {
+    val df = spark.range(10)
+    val planned = AlreadyOptimized.dataFrame(spark, 
df.queryExecution.optimizedPlan)
+
+    checkAnswer(planned.groupBy('id).count(), 
df.groupBy('id).count().collect())
   }
 
   test("planning on top works - joins") {
     val df = spark.range(10)
-    val planned = AlreadyPlanned.dataFrame(spark, df.queryExecution.sparkPlan)
+    val planned = AlreadyOptimized.dataFrame(spark, 
df.queryExecution.optimizedPlan)
 
     val plannedLeft = planned.alias("l")
     val dfLeft = df.alias("l")
@@ -59,22 +66,20 @@ class AlreadyPlannedSuite extends SparkPlanTest with 
SharedSparkSession {
 
     checkAnswer(
       plannedLeft.where('id < 3).join(plannedRight, Seq("id")),
-      identity,
       dfLeft.where('id < 3).join(dfRight, Seq("id")).collect())
 
     checkAnswer(
       plannedLeft.where('id < 3).join(plannedRight, plannedLeft("id") === 
plannedRight("id")),
-      identity,
       dfLeft.where('id < 3).join(dfRight, dfLeft("id") === 
dfRight("id")).collect())
 
     checkAnswer(
       plannedLeft.join(plannedRight, Seq("id")).where('id < 3),
-      identity,
       dfLeft.join(dfRight, Seq("id")).where('id < 3).collect())
 
     checkAnswer(
       plannedLeft.join(plannedRight, plannedLeft("id") === 
plannedRight("id")).where($"l.id" < 3),
-      identity,
       dfLeft.join(dfRight, dfLeft("id") === dfRight("id")).where($"l.id" < 
3).collect())
   }
 }
+
+class AlreadyOptimizedAQESuite extends AlreadyOptimizedSuite with 
EnableAdaptiveExecutionSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to