Repository: spark
Updated Branches:
  refs/heads/master c42d208e1 -> d28d5732a


[SPARK-21619][SQL] Fail the execution of canonicalized plans explicitly

## What changes were proposed in this pull request?
Canonicalized plans are not supposed to be executed. I ran into a case in which 
there's some code that accidentally calls execute on a canonicalized plan. This 
patch throws a more explicit exception when that happens.

## How was this patch tested?
Added a test case in SparkPlanSuite.

Author: Reynold Xin <[email protected]>

Closes #18828 from rxin/SPARK-21619.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d28d5732
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d28d5732
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d28d5732

Branch: refs/heads/master
Commit: d28d5732ae205771f1f443b15b10e64dcffb5ff0
Parents: c42d208
Author: Reynold Xin <[email protected]>
Authored: Fri Oct 27 23:44:24 2017 -0700
Committer: gatorsmile <[email protected]>
Committed: Fri Oct 27 23:44:24 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  5 +--
 .../spark/sql/catalyst/plans/QueryPlan.scala    | 30 +++++++++++++---
 .../plans/logical/basicLogicalOperators.scala   |  2 +-
 .../sql/catalyst/plans/logical/hints.scala      |  2 +-
 .../sql/execution/DataSourceScanExec.scala      |  4 +--
 .../apache/spark/sql/execution/SparkPlan.scala  |  6 ++++
 .../spark/sql/execution/SparkSqlParser.scala    |  2 +-
 .../sql/execution/basicPhysicalOperators.scala  |  2 +-
 .../spark/sql/execution/command/cache.scala     |  5 ++-
 .../execution/datasources/LogicalRelation.scala |  2 +-
 .../exchange/BroadcastExchangeExec.scala        |  2 +-
 .../spark/sql/execution/exchange/Exchange.scala |  2 +-
 .../spark/sql/execution/SparkPlanSuite.scala    | 36 ++++++++++++++++++++
 .../sql/hive/execution/HiveTableScanExec.scala  |  4 +--
 14 files changed, 86 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 1dbae4d..b87bbb4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -438,7 +438,7 @@ case class HiveTableRelation(
 
   def isPartitioned: Boolean = partitionCols.nonEmpty
 
-  override lazy val canonicalized: HiveTableRelation = copy(
+  override def doCanonicalize(): HiveTableRelation = copy(
     tableMeta = tableMeta.copy(
       storage = CatalogStorageFormat.empty,
       createTime = -1
@@ -448,7 +448,8 @@ case class HiveTableRelation(
     },
     partitionCols = partitionCols.zipWithIndex.map {
       case (attr, index) => attr.withExprId(ExprId(index + dataCols.length))
-    })
+    }
+  )
 
   override def computeStats(): Statistics = {
     tableMeta.stats.map(_.toPlanStats(output)).getOrElse {

http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index c7952e3..d21b4af 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -181,6 +181,15 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] 
extends TreeNode[PlanT
   override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
 
   /**
+   * A private mutable variable to indicate whether this plan is the result of 
canonicalization.
+   * This is used solely for making sure we wouldn't execute a canonicalized 
plan.
+   * See [[canonicalized]] on how this is set.
+   */
+  @transient private var _isCanonicalizedPlan: Boolean = false
+
+  protected def isCanonicalizedPlan: Boolean = _isCanonicalizedPlan
+
+  /**
    * Returns a plan where a best effort attempt has been made to transform 
`this` in a way
    * that preserves the result but removes cosmetic variations (case 
sensitivity, ordering for
    * commutative operations, expression id, etc.)
@@ -188,10 +197,24 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] 
extends TreeNode[PlanT
    * Plans where `this.canonicalized == other.canonicalized` will always 
evaluate to the same
    * result.
    *
-   * Some nodes should overwrite this to provide proper canonicalize logic, 
but they should remove
-   * expressions cosmetic variations themselves.
+   * Plan nodes that require special canonicalization should override 
[[doCanonicalize()]].
+   * They should remove expressions cosmetic variations themselves.
+   */
+  @transient final lazy val canonicalized: PlanType = {
+    var plan = doCanonicalize()
+    // If the plan has not been changed due to canonicalization, make a copy 
of it so we don't
+    // mutate the original plan's _isCanonicalizedPlan flag.
+    if (plan eq this) {
+      plan = plan.makeCopy(plan.mapProductIterator(x => 
x.asInstanceOf[AnyRef]))
+    }
+    plan._isCanonicalizedPlan = true
+    plan
+  }
+
+  /**
+   * Defines how the canonicalization should work for the current plan.
    */
-  lazy val canonicalized: PlanType = {
+  protected def doCanonicalize(): PlanType = {
     val canonicalizedChildren = children.map(_.canonicalized)
     var id = -1
     mapExpressions {
@@ -213,7 +236,6 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] 
extends TreeNode[PlanT
     }.withNewChildren(canonicalizedChildren)
   }
 
-
   /**
    * Returns true when the given query plan will return the same results as 
this query plan.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 80243d3..c2750c3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -760,7 +760,7 @@ case class SubqueryAlias(
     child: LogicalPlan)
   extends UnaryNode {
 
-  override lazy val canonicalized: LogicalPlan = child.canonicalized
+  override def doCanonicalize(): LogicalPlan = child.canonicalized
 
   override def output: Seq[Attribute] = 
child.output.map(_.withQualifier(Some(alias)))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
index 29a4352..cbb6265 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
@@ -41,7 +41,7 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo = 
HintInfo())
 
   override def output: Seq[Attribute] = child.output
 
-  override lazy val canonicalized: LogicalPlan = child.canonicalized
+  override def doCanonicalize(): LogicalPlan = child.canonicalized
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 8d0fc32..e9f6503 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -139,7 +139,7 @@ case class RowDataSourceScanExec(
   }
 
   // Don't care about `rdd` and `tableIdentifier` when canonicalizing.
-  override lazy val canonicalized: SparkPlan =
+  override def doCanonicalize(): SparkPlan =
     copy(
       fullOutput.map(QueryPlan.normalizeExprId(_, fullOutput)),
       rdd = null,
@@ -522,7 +522,7 @@ case class FileSourceScanExec(
     }
   }
 
-  override lazy val canonicalized: FileSourceScanExec = {
+  override def doCanonicalize(): FileSourceScanExec = {
     FileSourceScanExec(
       relation,
       output.map(QueryPlan.normalizeExprId(_, output)),

http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 2ffd948..657b265 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -111,6 +111,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
    * Concrete implementations of SparkPlan should override `doExecute`.
    */
   final def execute(): RDD[InternalRow] = executeQuery {
+    if (isCanonicalizedPlan) {
+      throw new IllegalStateException("A canonicalized plan is not supposed to 
be executed.")
+    }
     doExecute()
   }
 
@@ -121,6 +124,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
    * Concrete implementations of SparkPlan should override 
`doExecuteBroadcast`.
    */
   final def executeBroadcast[T](): broadcast.Broadcast[T] = executeQuery {
+    if (isCanonicalizedPlan) {
+      throw new IllegalStateException("A canonicalized plan is not supposed to 
be executed.")
+    }
     doExecuteBroadcast()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 6de9ea0..29b584b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -286,7 +286,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
    * Create a [[ClearCacheCommand]] logical plan.
    */
   override def visitClearCache(ctx: ClearCacheContext): LogicalPlan = 
withOrigin(ctx) {
-    ClearCacheCommand
+    ClearCacheCommand()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index d15ece3..e58c3ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -350,7 +350,7 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
 
-  override lazy val canonicalized: SparkPlan = {
+  override def doCanonicalize(): SparkPlan = {
     
RangeExec(range.canonicalized.asInstanceOf[org.apache.spark.sql.catalyst.plans.logical.Range])
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 140f920..687994d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -66,10 +66,13 @@ case class UncacheTableCommand(
 /**
  * Clear all cached data from the in-memory cache.
  */
-case object ClearCacheCommand extends RunnableCommand {
+case class ClearCacheCommand() extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     sparkSession.catalog.clearCache()
     Seq.empty[Row]
   }
+
+  /** [[org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy()]] does not 
support 0-arg ctor. */
+  override def makeCopy(newArgs: Array[AnyRef]): ClearCacheCommand = 
ClearCacheCommand()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 3e98cb2..2369957 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -35,7 +35,7 @@ case class LogicalRelation(
   extends LeafNode with MultiInstanceRelation {
 
   // Only care about relation when canonicalizing.
-  override lazy val canonicalized: LogicalPlan = copy(
+  override def doCanonicalize(): LogicalPlan = copy(
     output = output.map(QueryPlan.normalizeExprId(_, output)),
     catalogTable = None)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 880e18c..daea6c3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -48,7 +48,7 @@ case class BroadcastExchangeExec(
 
   override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
 
-  override lazy val canonicalized: SparkPlan = {
+  override def doCanonicalize(): SparkPlan = {
     BroadcastExchangeExec(mode.canonicalized, child.canonicalized)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
index 4b52f3e..09f79a2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
@@ -50,7 +50,7 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
   extends LeafExecNode {
 
   // Ignore this wrapper for canonicalizing.
-  override lazy val canonicalized: SparkPlan = child.canonicalized
+  override def doCanonicalize(): SparkPlan = child.canonicalized
 
   def doExecute(): RDD[InternalRow] = {
     child.execute()

http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
new file mode 100644
index 0000000..750d9e4
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+class SparkPlanSuite extends QueryTest with SharedSQLContext {
+
+  test("SPARK-21619 execution of a canonicalized plan should fail") {
+    val plan = spark.range(10).queryExecution.executedPlan.canonicalized
+
+    intercept[IllegalStateException] { plan.execute() }
+    intercept[IllegalStateException] { plan.executeCollect() }
+    intercept[IllegalStateException] { plan.executeCollectPublic() }
+    intercept[IllegalStateException] { plan.executeToIterator() }
+    intercept[IllegalStateException] { plan.executeBroadcast() }
+    intercept[IllegalStateException] { plan.executeTake(1) }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d28d5732/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 4f8dab9..7dcaf17 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -203,11 +203,11 @@ case class HiveTableScanExec(
     }
   }
 
-  override lazy val canonicalized: HiveTableScanExec = {
+  override def doCanonicalize(): HiveTableScanExec = {
     val input: AttributeSeq = relation.output
     HiveTableScanExec(
       requestedAttributes.map(QueryPlan.normalizeExprId(_, input)),
-      relation.canonicalized,
+      relation.canonicalized.asInstanceOf[HiveTableRelation],
       QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to