[SPARK-14855][SQL] Add "Exec" suffix to physical operators

## What changes were proposed in this pull request?
This patch adds "Exec" suffix to all physical operators. Before this patch, 
Spark's physical operators and logical operators are named the same (e.g. 
Project could be logical.Project or execution.Project), which caused small 
issues in code review and bigger issues in code refactoring.

## How was this patch tested?
N/A

Author: Reynold Xin <[email protected]>

Closes #12617 from rxin/exec-node.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7d0cad0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7d0cad0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7d0cad0

Branch: refs/heads/master
Commit: d7d0cad0ad7667c0e09ae01601ee0e4d0b09963c
Parents: c431a76
Author: Reynold Xin <[email protected]>
Authored: Fri Apr 22 17:43:56 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Fri Apr 22 17:43:56 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/trees/TreeNode.scala     |    9 +-
 .../spark/sql/execution/ExistingRDD.scala       |   24 +-
 .../org/apache/spark/sql/execution/Expand.scala |  202 ----
 .../apache/spark/sql/execution/ExpandExec.scala |  202 ++++
 .../apache/spark/sql/execution/Generate.scala   |  103 --
 .../spark/sql/execution/GenerateExec.scala      |  103 ++
 .../spark/sql/execution/LocalTableScan.scala    |   58 -
 .../sql/execution/LocalTableScanExec.scala      |   58 +
 .../spark/sql/execution/QueryExecution.scala    |    6 +-
 .../org/apache/spark/sql/execution/Sort.scala   |  183 ----
 .../apache/spark/sql/execution/SortExec.scala   |  183 ++++
 .../apache/spark/sql/execution/SparkPlan.scala  |   10 +-
 .../spark/sql/execution/SparkPlanInfo.scala     |    4 +-
 .../spark/sql/execution/SparkPlanner.scala      |    4 +-
 .../spark/sql/execution/SparkStrategies.scala   |  110 +-
 .../spark/sql/execution/WholeStageCodegen.scala |  492 ---------
 .../sql/execution/WholeStageCodegenExec.scala   |  492 +++++++++
 .../org/apache/spark/sql/execution/Window.scala | 1008 ------------------
 .../apache/spark/sql/execution/WindowExec.scala | 1008 ++++++++++++++++++
 .../aggregate/SortBasedAggregate.scala          |  111 --
 .../aggregate/SortBasedAggregateExec.scala      |  111 ++
 .../execution/aggregate/TungstenAggregate.scala |    2 +-
 .../spark/sql/execution/aggregate/utils.scala   |   10 +-
 .../spark/sql/execution/basicOperators.scala    |   50 +-
 .../columnar/InMemoryColumnarTableScan.scala    |  358 -------
 .../columnar/InMemoryTableScanExec.scala        |  358 +++++++
 .../spark/sql/execution/command/commands.scala  |    2 +-
 .../datasources/DataSourceStrategy.scala        |   18 +-
 .../datasources/FileSourceStrategy.scala        |    8 +-
 .../spark/sql/execution/debug/package.scala     |   10 +-
 .../execution/exchange/BroadcastExchange.scala  |  109 --
 .../exchange/BroadcastExchangeExec.scala        |  109 ++
 .../execution/exchange/EnsureRequirements.scala |    4 +-
 .../spark/sql/execution/exchange/Exchange.scala |    9 +-
 .../sql/execution/joins/BroadcastHashJoin.scala |  401 -------
 .../execution/joins/BroadcastHashJoinExec.scala |  401 +++++++
 .../joins/BroadcastNestedLoopJoin.scala         |  331 ------
 .../joins/BroadcastNestedLoopJoinExec.scala     |  331 ++++++
 .../sql/execution/joins/CartesianProduct.scala  |  103 --
 .../execution/joins/CartesianProductExec.scala  |  103 ++
 .../sql/execution/joins/ShuffledHashJoin.scala  |   81 --
 .../execution/joins/ShuffledHashJoinExec.scala  |   81 ++
 .../sql/execution/joins/SortMergeJoin.scala     |  964 -----------------
 .../sql/execution/joins/SortMergeJoinExec.scala |  964 +++++++++++++++++
 .../org/apache/spark/sql/execution/limit.scala  |   19 +-
 .../apache/spark/sql/execution/objects.scala    |   39 +-
 .../execution/python/BatchEvalPythonExec.scala  |  149 +++
 .../python/BatchPythonEvaluation.scala          |  149 ---
 .../execution/python/ExtractPythonUDFs.scala    |    4 +-
 .../streaming/IncrementalExecution.scala        |   15 +-
 .../execution/streaming/StatefulAggregate.scala |   10 +-
 .../apache/spark/sql/execution/subquery.scala   |    2 +-
 .../spark/sql/execution/ui/SparkPlanGraph.scala |    4 +-
 .../org/apache/spark/sql/CachedTableSuite.scala |   10 +-
 .../spark/sql/ColumnExpressionSuite.scala       |    4 +-
 .../apache/spark/sql/DataFrameJoinSuite.scala   |    6 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   |   10 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |  115 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    |    8 +-
 .../spark/sql/execution/ExchangeSuite.scala     |   12 +-
 .../spark/sql/execution/PlannerSuite.scala      |   36 +-
 .../spark/sql/execution/ReferenceSort.scala     |    2 +-
 .../apache/spark/sql/execution/SortSuite.scala  |   16 +-
 .../execution/TakeOrderedAndProjectSuite.scala  |   21 +-
 .../sql/execution/WholeStageCodegenSuite.scala  |   36 +-
 .../columnar/PartitionBatchPruningSuite.scala   |    2 +-
 .../datasources/FileSourceStrategySuite.scala   |    6 +-
 .../execution/joins/BroadcastJoinSuite.scala    |    6 +-
 .../execution/joins/ExistenceJoinSuite.scala    |    8 +-
 .../sql/execution/joins/InnerJoinSuite.scala    |   14 +-
 .../sql/execution/joins/OuterJoinSuite.scala    |   10 +-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |   16 +-
 .../spark/sql/sources/FilteredScanSuite.scala   |    2 +-
 .../spark/sql/sources/PrunedScanSuite.scala     |    2 +-
 .../apache/spark/sql/test/SQLTestUtils.scala    |    4 +-
 .../spark/sql/util/DataFrameCallbackSuite.scala |    4 +-
 .../thriftserver/HiveThriftServer2Suites.scala  |    4 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |    8 +-
 .../sql/hive/execution/HiveTableScan.scala      |  166 ---
 .../sql/hive/execution/HiveTableScanExec.scala  |  166 +++
 .../hive/execution/InsertIntoHiveTable.scala    |    4 +-
 .../hive/execution/ScriptTransformation.scala   |    2 +-
 .../spark/sql/hive/CachedTableSuite.scala       |    6 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala |   12 +-
 .../sql/hive/execution/HiveComparisonTest.scala |    2 +-
 .../sql/hive/execution/HiveQuerySuite.scala     |    4 +-
 .../hive/execution/HiveTypeCoercionSuite.scala  |    4 +-
 .../spark/sql/hive/execution/PruningSuite.scala |    2 +-
 .../execution/ScriptTransformationSuite.scala   |    4 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |   14 +-
 .../spark/sql/sources/BucketedReadSuite.scala   |   10 +-
 .../sources/ParquetHadoopFsRelationSuite.scala  |    4 +-
 .../sql/sources/hadoopFsRelationSuites.scala    |    4 +-
 93 files changed, 5241 insertions(+), 5204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 232ca43..3d0e016 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -408,8 +408,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
     }
   }
 
-  /** Returns the name of this type of TreeNode.  Defaults to the class name. 
*/
-  def nodeName: String = getClass.getSimpleName
+  /**
+   * Returns the name of this type of TreeNode.  Defaults to the class name.
+   * Note that we remove the "Exec" suffix for physical operators here.
+   */
+  def nodeName: String = getClass.getSimpleName.replaceAll("Exec$", "")
 
   /**
    * The arguments that should be included in the arg string.  Defaults to the 
`productIterator`.
@@ -426,7 +429,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
     case other => other :: Nil
   }.mkString(", ")
 
-  /** String representation of this node without any children */
+  /** String representation of this node without any children. */
   def simpleString: String = s"$nodeName $argString".trim
 
   override def toString: String = treeString

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 12d03a7..b3a197c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -100,10 +100,10 @@ private[sql] case class LogicalRDD(
 }
 
 /** Physical plan node for scanning data from an RDD. */
-private[sql] case class PhysicalRDD(
+private[sql] case class RDDScanExec(
     output: Seq[Attribute],
     rdd: RDD[InternalRow],
-    override val nodeName: String) extends LeafNode {
+    override val nodeName: String) extends LeafExecNode {
 
   private[sql] override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
@@ -124,7 +124,7 @@ private[sql] case class PhysicalRDD(
   }
 }
 
-private[sql] trait DataSourceScan extends LeafNode {
+private[sql] trait DataSourceScanExec extends LeafExecNode {
   val rdd: RDD[InternalRow]
   val relation: BaseRelation
 
@@ -132,19 +132,19 @@ private[sql] trait DataSourceScan extends LeafNode {
 
   // Ignore rdd when checking results
   override def sameResult(plan: SparkPlan): Boolean = plan match {
-    case other: DataSourceScan => relation == other.relation && metadata == 
other.metadata
+    case other: DataSourceScanExec => relation == other.relation && metadata 
== other.metadata
     case _ => false
   }
 }
 
 /** Physical plan node for scanning data from a relation. */
-private[sql] case class RowDataSourceScan(
+private[sql] case class RowDataSourceScanExec(
     output: Seq[Attribute],
     rdd: RDD[InternalRow],
     @transient relation: BaseRelation,
     override val outputPartitioning: Partitioning,
     override val metadata: Map[String, String] = Map.empty)
-  extends DataSourceScan with CodegenSupport {
+  extends DataSourceScanExec with CodegenSupport {
 
   private[sql] override lazy val metrics =
     Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number 
of output rows"))
@@ -207,13 +207,13 @@ private[sql] case class RowDataSourceScan(
 }
 
 /** Physical plan node for scanning data from a batched relation. */
-private[sql] case class BatchedDataSourceScan(
+private[sql] case class BatchedDataSourceScanExec(
     output: Seq[Attribute],
     rdd: RDD[InternalRow],
     @transient relation: BaseRelation,
     override val outputPartitioning: Partitioning,
     override val metadata: Map[String, String] = Map.empty)
-  extends DataSourceScan with CodegenSupport {
+  extends DataSourceScanExec with CodegenSupport {
 
   private[sql] override lazy val metrics =
     Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number 
of output rows"),
@@ -316,7 +316,7 @@ private[sql] case class BatchedDataSourceScan(
   }
 }
 
-private[sql] object DataSourceScan {
+private[sql] object DataSourceScanExec {
   // Metadata keys
   val INPUT_PATHS = "InputPaths"
   val PUSHED_FILTERS = "PushedFilters"
@@ -325,7 +325,7 @@ private[sql] object DataSourceScan {
       output: Seq[Attribute],
       rdd: RDD[InternalRow],
       relation: BaseRelation,
-      metadata: Map[String, String] = Map.empty): DataSourceScan = {
+      metadata: Map[String, String] = Map.empty): DataSourceScanExec = {
     val outputPartitioning = {
       val bucketSpec = relation match {
         // TODO: this should be closer to bucket planning.
@@ -349,9 +349,9 @@ private[sql] object DataSourceScan {
 
     relation match {
       case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sqlContext, 
relation.schema) =>
-        BatchedDataSourceScan(output, rdd, relation, outputPartitioning, 
metadata)
+        BatchedDataSourceScanExec(output, rdd, relation, outputPartitioning, 
metadata)
       case _ =>
-        RowDataSourceScan(output, rdd, relation, outputPartitioning, metadata)
+        RowDataSourceScanExec(output, rdd, relation, outputPartitioning, 
metadata)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
deleted file mode 100644
index 3966af5..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.errors._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
-import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
-import org.apache.spark.sql.execution.metric.SQLMetrics
-
-/**
- * Apply the all of the GroupExpressions to every input row, hence we will get
- * multiple output rows for a input row.
- * @param projections The group of expressions, all of the group expressions 
should
- *                    output the same schema specified bye the parameter 
`output`
- * @param output      The output Schema
- * @param child       Child operator
- */
-case class Expand(
-    projections: Seq[Seq[Expression]],
-    output: Seq[Attribute],
-    child: SparkPlan)
-  extends UnaryNode with CodegenSupport {
-
-  private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
-
-  // The GroupExpressions can output data with arbitrary partitioning, so set 
it
-  // as UNKNOWN partitioning
-  override def outputPartitioning: Partitioning = UnknownPartitioning(0)
-
-  override def references: AttributeSet =
-    AttributeSet(projections.flatten.flatMap(_.references))
-
-  private[this] val projection =
-    (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
-
-  protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"execute") {
-    val numOutputRows = longMetric("numOutputRows")
-
-    child.execute().mapPartitions { iter =>
-      val groups = projections.map(projection).toArray
-      new Iterator[InternalRow] {
-        private[this] var result: InternalRow = _
-        private[this] var idx = -1  // -1 means the initial state
-        private[this] var input: InternalRow = _
-
-        override final def hasNext: Boolean = (-1 < idx && idx < 
groups.length) || iter.hasNext
-
-        override final def next(): InternalRow = {
-          if (idx <= 0) {
-            // in the initial (-1) or beginning(0) of a new input row, fetch 
the next input tuple
-            input = iter.next()
-            idx = 0
-          }
-
-          result = groups(idx)(input)
-          idx += 1
-
-          if (idx == groups.length && iter.hasNext) {
-            idx = 0
-          }
-
-          numOutputRows += 1
-          result
-        }
-      }
-    }
-  }
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    child.asInstanceOf[CodegenSupport].inputRDDs()
-  }
-
-  protected override def doProduce(ctx: CodegenContext): String = {
-    child.asInstanceOf[CodegenSupport].produce(ctx, this)
-  }
-
-  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
-    /*
-     * When the projections list looks like:
-     *   expr1A, exprB, expr1C
-     *   expr2A, exprB, expr2C
-     *   ...
-     *   expr(N-1)A, exprB, expr(N-1)C
-     *
-     * i.e. column A and C have different values for each output row, but 
column B stays constant.
-     *
-     * The generated code looks something like (note that B is only computed 
once in declaration):
-     *
-     * // part 1: declare all the columns
-     * colA = ...
-     * colB = ...
-     * colC = ...
-     *
-     * // part 2: code that computes the columns
-     * for (row = 0; row < N; row++) {
-     *   switch (row) {
-     *     case 0:
-     *       colA = ...
-     *       colC = ...
-     *     case 1:
-     *       colA = ...
-     *       colC = ...
-     *     ...
-     *     case N - 1:
-     *       colA = ...
-     *       colC = ...
-     *   }
-     *   // increment metrics and consume output values
-     * }
-     *
-     * We use a for loop here so we only includes one copy of the consume code 
and avoid code
-     * size explosion.
-     */
-
-    // Set input variables
-    ctx.currentVars = input
-
-    // Tracks whether a column has the same output for all rows.
-    // Size of sameOutput array should equal N.
-    // If sameOutput(i) is true, then the i-th column has the same value for 
all output rows given
-    // an input row.
-    val sameOutput: Array[Boolean] = output.indices.map { colIndex =>
-      projections.map(p => p(colIndex)).toSet.size == 1
-    }.toArray
-
-    // Part 1: declare variables for each column
-    // If a column has the same value for all output rows, then we also 
generate its computation
-    // right after declaration. Otherwise its value is computed in the part 2.
-    val outputColumns = output.indices.map { col =>
-      val firstExpr = projections.head(col)
-      if (sameOutput(col)) {
-        // This column is the same across all output rows. Just generate code 
for it here.
-        BindReferences.bindReference(firstExpr, child.output).genCode(ctx)
-      } else {
-        val isNull = ctx.freshName("isNull")
-        val value = ctx.freshName("value")
-        val code = s"""
-          |boolean $isNull = true;
-          |${ctx.javaType(firstExpr.dataType)} $value = 
${ctx.defaultValue(firstExpr.dataType)};
-         """.stripMargin
-        ExprCode(code, isNull, value)
-      }
-    }
-
-    // Part 2: switch/case statements
-    val cases = projections.zipWithIndex.map { case (exprs, row) =>
-      var updateCode = ""
-      for (col <- exprs.indices) {
-        if (!sameOutput(col)) {
-          val ev = BindReferences.bindReference(exprs(col), 
child.output).genCode(ctx)
-          updateCode +=
-            s"""
-               |${ev.code}
-               |${outputColumns(col).isNull} = ${ev.isNull};
-               |${outputColumns(col).value} = ${ev.value};
-            """.stripMargin
-        }
-      }
-
-      s"""
-         |case $row:
-         |  ${updateCode.trim}
-         |  break;
-       """.stripMargin
-    }
-
-    val numOutput = metricTerm(ctx, "numOutputRows")
-    val i = ctx.freshName("i")
-    // these column have to declared before the loop.
-    val evaluate = evaluateVariables(outputColumns)
-    ctx.copyResult = true
-    s"""
-       |$evaluate
-       |for (int $i = 0; $i < ${projections.length}; $i ++) {
-       |  switch ($i) {
-       |    ${cases.mkString("\n").trim}
-       |  }
-       |  $numOutput.add(1);
-       |  ${consume(ctx, outputColumns)}
-       |}
-     """.stripMargin
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
new file mode 100644
index 0000000..7c47566
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.errors._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.execution.metric.SQLMetrics
+
+/**
+ * Apply the all of the GroupExpressions to every input row, hence we will get
+ * multiple output rows for a input row.
+ * @param projections The group of expressions, all of the group expressions 
should
+ *                    output the same schema specified bye the parameter 
`output`
+ * @param output      The output Schema
+ * @param child       Child operator
+ */
+case class ExpandExec(
+    projections: Seq[Seq[Expression]],
+    output: Seq[Attribute],
+    child: SparkPlan)
+  extends UnaryExecNode with CodegenSupport {
+
+  private[sql] override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
+  // The GroupExpressions can output data with arbitrary partitioning, so set 
it
+  // as UNKNOWN partitioning
+  override def outputPartitioning: Partitioning = UnknownPartitioning(0)
+
+  override def references: AttributeSet =
+    AttributeSet(projections.flatten.flatMap(_.references))
+
+  private[this] val projection =
+    (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
+
+  protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"execute") {
+    val numOutputRows = longMetric("numOutputRows")
+
+    child.execute().mapPartitions { iter =>
+      val groups = projections.map(projection).toArray
+      new Iterator[InternalRow] {
+        private[this] var result: InternalRow = _
+        private[this] var idx = -1  // -1 means the initial state
+        private[this] var input: InternalRow = _
+
+        override final def hasNext: Boolean = (-1 < idx && idx < 
groups.length) || iter.hasNext
+
+        override final def next(): InternalRow = {
+          if (idx <= 0) {
+            // in the initial (-1) or beginning(0) of a new input row, fetch 
the next input tuple
+            input = iter.next()
+            idx = 0
+          }
+
+          result = groups(idx)(input)
+          idx += 1
+
+          if (idx == groups.length && iter.hasNext) {
+            idx = 0
+          }
+
+          numOutputRows += 1
+          result
+        }
+      }
+    }
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  protected override def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
+    /*
+     * When the projections list looks like:
+     *   expr1A, exprB, expr1C
+     *   expr2A, exprB, expr2C
+     *   ...
+     *   expr(N-1)A, exprB, expr(N-1)C
+     *
+     * i.e. column A and C have different values for each output row, but 
column B stays constant.
+     *
+     * The generated code looks something like (note that B is only computed 
once in declaration):
+     *
+     * // part 1: declare all the columns
+     * colA = ...
+     * colB = ...
+     * colC = ...
+     *
+     * // part 2: code that computes the columns
+     * for (row = 0; row < N; row++) {
+     *   switch (row) {
+     *     case 0:
+     *       colA = ...
+     *       colC = ...
+     *     case 1:
+     *       colA = ...
+     *       colC = ...
+     *     ...
+     *     case N - 1:
+     *       colA = ...
+     *       colC = ...
+     *   }
+     *   // increment metrics and consume output values
+     * }
+     *
+     * We use a for loop here so we only includes one copy of the consume code 
and avoid code
+     * size explosion.
+     */
+
+    // Set input variables
+    ctx.currentVars = input
+
+    // Tracks whether a column has the same output for all rows.
+    // Size of sameOutput array should equal N.
+    // If sameOutput(i) is true, then the i-th column has the same value for 
all output rows given
+    // an input row.
+    val sameOutput: Array[Boolean] = output.indices.map { colIndex =>
+      projections.map(p => p(colIndex)).toSet.size == 1
+    }.toArray
+
+    // Part 1: declare variables for each column
+    // If a column has the same value for all output rows, then we also 
generate its computation
+    // right after declaration. Otherwise its value is computed in the part 2.
+    val outputColumns = output.indices.map { col =>
+      val firstExpr = projections.head(col)
+      if (sameOutput(col)) {
+        // This column is the same across all output rows. Just generate code 
for it here.
+        BindReferences.bindReference(firstExpr, child.output).genCode(ctx)
+      } else {
+        val isNull = ctx.freshName("isNull")
+        val value = ctx.freshName("value")
+        val code = s"""
+          |boolean $isNull = true;
+          |${ctx.javaType(firstExpr.dataType)} $value = 
${ctx.defaultValue(firstExpr.dataType)};
+         """.stripMargin
+        ExprCode(code, isNull, value)
+      }
+    }
+
+    // Part 2: switch/case statements
+    val cases = projections.zipWithIndex.map { case (exprs, row) =>
+      var updateCode = ""
+      for (col <- exprs.indices) {
+        if (!sameOutput(col)) {
+          val ev = BindReferences.bindReference(exprs(col), 
child.output).genCode(ctx)
+          updateCode +=
+            s"""
+               |${ev.code}
+               |${outputColumns(col).isNull} = ${ev.isNull};
+               |${outputColumns(col).value} = ${ev.value};
+            """.stripMargin
+        }
+      }
+
+      s"""
+         |case $row:
+         |  ${updateCode.trim}
+         |  break;
+       """.stripMargin
+    }
+
+    val numOutput = metricTerm(ctx, "numOutputRows")
+    val i = ctx.freshName("i")
+    // these column have to declared before the loop.
+    val evaluate = evaluateVariables(outputColumns)
+    ctx.copyResult = true
+    s"""
+       |$evaluate
+       |for (int $i = 0; $i < ${projections.length}; $i ++) {
+       |  switch ($i) {
+       |    ${cases.mkString("\n").trim}
+       |  }
+       |  $numOutput.add(1);
+       |  ${consume(ctx, outputColumns)}
+       |}
+     """.stripMargin
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
deleted file mode 100644
index 9938d21..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.metric.SQLMetrics
-
-/**
- * For lazy computing, be sure the generator.terminate() called in the very 
last
- * TODO reusing the CompletionIterator?
- */
-private[execution] sealed case class LazyIterator(func: () => 
TraversableOnce[InternalRow])
-  extends Iterator[InternalRow] {
-
-  lazy val results = func().toIterator
-  override def hasNext: Boolean = results.hasNext
-  override def next(): InternalRow = results.next()
-}
-
-/**
- * Applies a [[Generator]] to a stream of input rows, combining the
- * output of each into a new stream of rows.  This operation is similar to a 
`flatMap` in functional
- * programming with one important additional feature, which allows the input 
rows to be joined with
- * their output.
- * @param generator the generator expression
- * @param join  when true, each output row is implicitly joined with the input 
tuple that produced
- *              it.
- * @param outer when true, each input row will be output at least once, even 
if the output of the
- *              given `generator` is empty. `outer` has no effect when `join` 
is false.
- * @param output the output attributes of this node, which constructed in 
analysis phase,
- *               and we can not change it, as the parent node bound with it 
already.
- */
-case class Generate(
-    generator: Generator,
-    join: Boolean,
-    outer: Boolean,
-    output: Seq[Attribute],
-    child: SparkPlan)
-  extends UnaryNode {
-
-  private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
-
-  override def producedAttributes: AttributeSet = AttributeSet(output)
-
-  val boundGenerator = BindReferences.bindReference(generator, child.output)
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    // boundGenerator.terminate() should be triggered after all of the rows in 
the partition
-    val rows = if (join) {
-      child.execute().mapPartitionsInternal { iter =>
-        val generatorNullRow = new 
GenericInternalRow(generator.elementTypes.size)
-        val joinedRow = new JoinedRow
-
-        iter.flatMap { row =>
-          // we should always set the left (child output)
-          joinedRow.withLeft(row)
-          val outputRows = boundGenerator.eval(row)
-          if (outer && outputRows.isEmpty) {
-            joinedRow.withRight(generatorNullRow) :: Nil
-          } else {
-            outputRows.map(joinedRow.withRight)
-          }
-        } ++ LazyIterator(boundGenerator.terminate).map { row =>
-          // we leave the left side as the last element of its child output
-          // keep it the same as Hive does
-          joinedRow.withRight(row)
-        }
-      }
-    } else {
-      child.execute().mapPartitionsInternal { iter =>
-        iter.flatMap(boundGenerator.eval) ++ 
LazyIterator(boundGenerator.terminate)
-      }
-    }
-
-    val numOutputRows = longMetric("numOutputRows")
-    rows.mapPartitionsInternal { iter =>
-      val proj = UnsafeProjection.create(output, output)
-      iter.map { r =>
-        numOutputRows += 1
-        proj(r)
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
new file mode 100644
index 0000000..10cfec3
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.metric.SQLMetrics
+
+/**
+ * For lazy computing, be sure the generator.terminate() called in the very 
last
+ * TODO reusing the CompletionIterator?
+ */
+private[execution] sealed case class LazyIterator(func: () => 
TraversableOnce[InternalRow])
+  extends Iterator[InternalRow] {
+
+  lazy val results = func().toIterator
+  override def hasNext: Boolean = results.hasNext
+  override def next(): InternalRow = results.next()
+}
+
+/**
+ * Applies a [[Generator]] to a stream of input rows, combining the
+ * output of each into a new stream of rows.  This operation is similar to a 
`flatMap` in functional
+ * programming with one important additional feature, which allows the input 
rows to be joined with
+ * their output.
+ * @param generator the generator expression
+ * @param join  when true, each output row is implicitly joined with the input 
tuple that produced
+ *              it.
+ * @param outer when true, each input row will be output at least once, even 
if the output of the
+ *              given `generator` is empty. `outer` has no effect when `join` 
is false.
+ * @param output the output attributes of this node, which constructed in 
analysis phase,
+ *               and we can not change it, as the parent node bound with it 
already.
+ */
+case class GenerateExec(
+    generator: Generator,
+    join: Boolean,
+    outer: Boolean,
+    output: Seq[Attribute],
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  private[sql] override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
+  override def producedAttributes: AttributeSet = AttributeSet(output)
+
+  val boundGenerator = BindReferences.bindReference(generator, child.output)
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    // boundGenerator.terminate() should be triggered after all of the rows in 
the partition
+    val rows = if (join) {
+      child.execute().mapPartitionsInternal { iter =>
+        val generatorNullRow = new 
GenericInternalRow(generator.elementTypes.size)
+        val joinedRow = new JoinedRow
+
+        iter.flatMap { row =>
+          // we should always set the left (child output)
+          joinedRow.withLeft(row)
+          val outputRows = boundGenerator.eval(row)
+          if (outer && outputRows.isEmpty) {
+            joinedRow.withRight(generatorNullRow) :: Nil
+          } else {
+            outputRows.map(joinedRow.withRight)
+          }
+        } ++ LazyIterator(boundGenerator.terminate).map { row =>
+          // we leave the left side as the last element of its child output
+          // keep it the same as Hive does
+          joinedRow.withRight(row)
+        }
+      }
+    } else {
+      child.execute().mapPartitionsInternal { iter =>
+        iter.flatMap(boundGenerator.eval) ++ 
LazyIterator(boundGenerator.terminate)
+      }
+    }
+
+    val numOutputRows = longMetric("numOutputRows")
+    rows.mapPartitionsInternal { iter =>
+      val proj = UnsafeProjection.create(output, output)
+      iter.map { r =>
+        numOutputRows += 1
+        proj(r)
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
deleted file mode 100644
index f8aec9e..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
-import org.apache.spark.sql.execution.metric.SQLMetrics
-
-
-/**
- * Physical plan node for scanning data from a local collection.
- */
-private[sql] case class LocalTableScan(
-    output: Seq[Attribute],
-    rows: Seq[InternalRow]) extends LeafNode {
-
-  private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
-
-  private val unsafeRows: Array[InternalRow] = {
-    val proj = UnsafeProjection.create(output, output)
-    rows.map(r => proj(r).copy()).toArray
-  }
-
-  private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    val numOutputRows = longMetric("numOutputRows")
-    rdd.map { r =>
-      numOutputRows += 1
-      r
-    }
-  }
-
-  override def executeCollect(): Array[InternalRow] = {
-    unsafeRows
-  }
-
-  override def executeTake(limit: Int): Array[InternalRow] = {
-    unsafeRows.take(limit)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
new file mode 100644
index 0000000..4ab447a
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
+import org.apache.spark.sql.execution.metric.SQLMetrics
+
+
+/**
+ * Physical plan node for scanning data from a local collection.
+ */
+private[sql] case class LocalTableScanExec(
+    output: Seq[Attribute],
+    rows: Seq[InternalRow]) extends LeafExecNode {
+
+  private[sql] override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
+  private val unsafeRows: Array[InternalRow] = {
+    val proj = UnsafeProjection.create(output, output)
+    rows.map(r => proj(r).copy()).toArray
+  }
+
+  private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    rdd.map { r =>
+      numOutputRows += 1
+      r
+    }
+  }
+
+  override def executeCollect(): Array[InternalRow] = {
+    unsafeRows
+  }
+
+  override def executeTake(limit: Int): Array[InternalRow] = {
+    unsafeRows.take(limit)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index a444a70..bb83676 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -27,7 +27,7 @@ import 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.command.{DescribeTableCommand, 
ExecutedCommand, HiveNativeCommand}
+import org.apache.spark.sql.execution.command.{DescribeTableCommand, 
ExecutedCommandExec, HiveNativeCommand}
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReuseExchange}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, 
TimestampType, _}
@@ -107,7 +107,7 @@ class QueryExecution(val sqlContext: SQLContext, val 
logical: LogicalPlan) {
    * execution is simply passed back to Hive.
    */
   def hiveResultString(): Seq[String] = executedPlan match {
-    case ExecutedCommand(desc: DescribeTableCommand) =>
+    case ExecutedCommandExec(desc: DescribeTableCommand) =>
       // If it is a describe command for a Hive table, we want to have the 
output format
       // be similar with Hive.
       desc.run(sqlContext).map {
@@ -117,7 +117,7 @@ class QueryExecution(val sqlContext: SQLContext, val 
logical: LogicalPlan) {
             .map(s => String.format(s"%-20s", s))
             .mkString("\t")
       }
-    case command: ExecutedCommand =>
+    case command: ExecutedCommandExec =>
       command.executeCollect().map(_.getString(0))
 
     case other =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
deleted file mode 100644
index 04a39a1..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.{SparkEnv, TaskContext}
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode, GenerateUnsafeProjection}
-import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
OrderedDistribution, UnspecifiedDistribution}
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types._
-import org.apache.spark.util.collection.unsafe.sort.RadixSort;
-
-/**
- * Performs (external) sorting.
- *
- * @param global when true performs a global sort of all partitions by 
shuffling the data first
- *               if necessary.
- * @param testSpillFrequency Method for configuring periodic spilling in unit 
tests. If set, will
- *                           spill every `frequency` records.
- */
-case class Sort(
-    sortOrder: Seq[SortOrder],
-    global: Boolean,
-    child: SparkPlan,
-    testSpillFrequency: Int = 0)
-  extends UnaryNode with CodegenSupport {
-
-  override def output: Seq[Attribute] = child.output
-
-  override def outputOrdering: Seq[SortOrder] = sortOrder
-
-  override def requiredChildDistribution: Seq[Distribution] =
-    if (global) OrderedDistribution(sortOrder) :: Nil else 
UnspecifiedDistribution :: Nil
-
-  private val enableRadixSort = sqlContext.conf.enableRadixSort
-
-  override private[sql] lazy val metrics = Map(
-    "sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"),
-    "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
-    "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
-
-  def createSorter(): UnsafeExternalRowSorter = {
-    val ordering = newOrdering(sortOrder, output)
-
-    // The comparator for comparing prefix
-    val boundSortExpression = BindReferences.bindReference(sortOrder.head, 
output)
-    val prefixComparator = 
SortPrefixUtils.getPrefixComparator(boundSortExpression)
-
-    val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
-      SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
-
-    // The generator for prefix
-    val prefixProjection = 
UnsafeProjection.create(Seq(SortPrefix(boundSortExpression)))
-    val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {
-      override def computePrefix(row: InternalRow): Long = {
-        prefixProjection.apply(row).getLong(0)
-      }
-    }
-
-    val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
-    val sorter = new UnsafeExternalRowSorter(
-      schema, ordering, prefixComparator, prefixComputer, pageSize, 
canUseRadixSort)
-
-    if (testSpillFrequency > 0) {
-      sorter.setTestSpillFrequency(testSpillFrequency)
-    }
-    sorter
-  }
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    val peakMemory = longMetric("peakMemory")
-    val spillSize = longMetric("spillSize")
-    val sortTime = longMetric("sortTime")
-
-    child.execute().mapPartitionsInternal { iter =>
-      val sorter = createSorter()
-
-      val metrics = TaskContext.get().taskMetrics()
-      // Remember spill data size of this task before execute this operator so 
that we can
-      // figure out how many bytes we spilled for this operator.
-      val spillSizeBefore = metrics.memoryBytesSpilled
-      val beforeSort = System.nanoTime()
-
-      val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
-
-      sortTime += (System.nanoTime() - beforeSort) / 1000000
-      peakMemory += sorter.getPeakMemoryUsage
-      spillSize += metrics.memoryBytesSpilled - spillSizeBefore
-      metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
-
-      sortedIterator
-    }
-  }
-
-  override def usedInputs: AttributeSet = AttributeSet(Seq.empty)
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    child.asInstanceOf[CodegenSupport].inputRDDs()
-  }
-
-  // Name of sorter variable used in codegen.
-  private var sorterVariable: String = _
-
-  override protected def doProduce(ctx: CodegenContext): String = {
-    val needToSort = ctx.freshName("needToSort")
-    ctx.addMutableState("boolean", needToSort, s"$needToSort = true;")
-
-    // Initialize the class member variables. This includes the instance of 
the Sorter and
-    // the iterator to return sorted rows.
-    val thisPlan = ctx.addReferenceObj("plan", this)
-    sorterVariable = ctx.freshName("sorter")
-    ctx.addMutableState(classOf[UnsafeExternalRowSorter].getName, 
sorterVariable,
-      s"$sorterVariable = $thisPlan.createSorter();")
-    val metrics = ctx.freshName("metrics")
-    ctx.addMutableState(classOf[TaskMetrics].getName, metrics,
-      s"$metrics = org.apache.spark.TaskContext.get().taskMetrics();")
-    val sortedIterator = ctx.freshName("sortedIter")
-    ctx.addMutableState("scala.collection.Iterator<UnsafeRow>", 
sortedIterator, "")
-
-    val addToSorter = ctx.freshName("addToSorter")
-    ctx.addNewFunction(addToSorter,
-      s"""
-        | private void $addToSorter() throws java.io.IOException {
-        |   ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
-        | }
-      """.stripMargin.trim)
-
-    // The child could change `copyResult` to true, but we had already 
consumed all the rows,
-    // so `copyResult` should be reset to `false`.
-    ctx.copyResult = false
-
-    val outputRow = ctx.freshName("outputRow")
-    val peakMemory = metricTerm(ctx, "peakMemory")
-    val spillSize = metricTerm(ctx, "spillSize")
-    val spillSizeBefore = ctx.freshName("spillSizeBefore")
-    val startTime = ctx.freshName("startTime")
-    val sortTime = metricTerm(ctx, "sortTime")
-    s"""
-       | if ($needToSort) {
-       |   long $spillSizeBefore = $metrics.memoryBytesSpilled();
-       |   long $startTime = System.nanoTime();
-       |   $addToSorter();
-       |   $sortedIterator = $sorterVariable.sort();
-       |   $sortTime.add((System.nanoTime() - $startTime) / 1000000);
-       |   $peakMemory.add($sorterVariable.getPeakMemoryUsage());
-       |   $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
-       |   
$metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());
-       |   $needToSort = false;
-       | }
-       |
-       | while ($sortedIterator.hasNext()) {
-       |   UnsafeRow $outputRow = (UnsafeRow)$sortedIterator.next();
-       |   ${consume(ctx, null, outputRow)}
-       |   if (shouldStop()) return;
-       | }
-     """.stripMargin.trim
-  }
-
-  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
-    s"""
-       |${row.code}
-       |$sorterVariable.insertRow((UnsafeRow)${row.value});
-     """.stripMargin
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
new file mode 100644
index 0000000..0e4d6d7
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode, GenerateUnsafeProjection}
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.types._
+import org.apache.spark.util.collection.unsafe.sort.RadixSort;
+
+/**
+ * Performs (external) sorting.
+ *
+ * @param global when true performs a global sort of all partitions by 
shuffling the data first
+ *               if necessary.
+ * @param testSpillFrequency Method for configuring periodic spilling in unit 
tests. If set, will
+ *                           spill every `frequency` records.
+ */
+case class SortExec(
+    sortOrder: Seq[SortOrder],
+    global: Boolean,
+    child: SparkPlan,
+    testSpillFrequency: Int = 0)
+  extends UnaryExecNode with CodegenSupport {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputOrdering: Seq[SortOrder] = sortOrder
+
+  override def requiredChildDistribution: Seq[Distribution] =
+    if (global) OrderedDistribution(sortOrder) :: Nil else 
UnspecifiedDistribution :: Nil
+
+  private val enableRadixSort = sqlContext.conf.enableRadixSort
+
+  override private[sql] lazy val metrics = Map(
+    "sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"),
+    "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
+    "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
+
+  def createSorter(): UnsafeExternalRowSorter = {
+    val ordering = newOrdering(sortOrder, output)
+
+    // The comparator for comparing prefix
+    val boundSortExpression = BindReferences.bindReference(sortOrder.head, 
output)
+    val prefixComparator = 
SortPrefixUtils.getPrefixComparator(boundSortExpression)
+
+    val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
+      SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
+
+    // The generator for prefix
+    val prefixProjection = 
UnsafeProjection.create(Seq(SortPrefix(boundSortExpression)))
+    val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {
+      override def computePrefix(row: InternalRow): Long = {
+        prefixProjection.apply(row).getLong(0)
+      }
+    }
+
+    val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
+    val sorter = new UnsafeExternalRowSorter(
+      schema, ordering, prefixComparator, prefixComputer, pageSize, 
canUseRadixSort)
+
+    if (testSpillFrequency > 0) {
+      sorter.setTestSpillFrequency(testSpillFrequency)
+    }
+    sorter
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val peakMemory = longMetric("peakMemory")
+    val spillSize = longMetric("spillSize")
+    val sortTime = longMetric("sortTime")
+
+    child.execute().mapPartitionsInternal { iter =>
+      val sorter = createSorter()
+
+      val metrics = TaskContext.get().taskMetrics()
+      // Remember spill data size of this task before execute this operator so 
that we can
+      // figure out how many bytes we spilled for this operator.
+      val spillSizeBefore = metrics.memoryBytesSpilled
+      val beforeSort = System.nanoTime()
+
+      val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
+
+      sortTime += (System.nanoTime() - beforeSort) / 1000000
+      peakMemory += sorter.getPeakMemoryUsage
+      spillSize += metrics.memoryBytesSpilled - spillSizeBefore
+      metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
+
+      sortedIterator
+    }
+  }
+
+  override def usedInputs: AttributeSet = AttributeSet(Seq.empty)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  // Name of sorter variable used in codegen.
+  private var sorterVariable: String = _
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    val needToSort = ctx.freshName("needToSort")
+    ctx.addMutableState("boolean", needToSort, s"$needToSort = true;")
+
+    // Initialize the class member variables. This includes the instance of 
the Sorter and
+    // the iterator to return sorted rows.
+    val thisPlan = ctx.addReferenceObj("plan", this)
+    sorterVariable = ctx.freshName("sorter")
+    ctx.addMutableState(classOf[UnsafeExternalRowSorter].getName, 
sorterVariable,
+      s"$sorterVariable = $thisPlan.createSorter();")
+    val metrics = ctx.freshName("metrics")
+    ctx.addMutableState(classOf[TaskMetrics].getName, metrics,
+      s"$metrics = org.apache.spark.TaskContext.get().taskMetrics();")
+    val sortedIterator = ctx.freshName("sortedIter")
+    ctx.addMutableState("scala.collection.Iterator<UnsafeRow>", 
sortedIterator, "")
+
+    val addToSorter = ctx.freshName("addToSorter")
+    ctx.addNewFunction(addToSorter,
+      s"""
+        | private void $addToSorter() throws java.io.IOException {
+        |   ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
+        | }
+      """.stripMargin.trim)
+
+    // The child could change `copyResult` to true, but we had already 
consumed all the rows,
+    // so `copyResult` should be reset to `false`.
+    ctx.copyResult = false
+
+    val outputRow = ctx.freshName("outputRow")
+    val peakMemory = metricTerm(ctx, "peakMemory")
+    val spillSize = metricTerm(ctx, "spillSize")
+    val spillSizeBefore = ctx.freshName("spillSizeBefore")
+    val startTime = ctx.freshName("startTime")
+    val sortTime = metricTerm(ctx, "sortTime")
+    s"""
+       | if ($needToSort) {
+       |   long $spillSizeBefore = $metrics.memoryBytesSpilled();
+       |   long $startTime = System.nanoTime();
+       |   $addToSorter();
+       |   $sortedIterator = $sorterVariable.sort();
+       |   $sortTime.add((System.nanoTime() - $startTime) / 1000000);
+       |   $peakMemory.add($sorterVariable.getPeakMemoryUsage());
+       |   $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
+       |   
$metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());
+       |   $needToSort = false;
+       | }
+       |
+       | while ($sortedIterator.hasNext()) {
+       |   UnsafeRow $outputRow = (UnsafeRow)$sortedIterator.next();
+       |   ${consume(ctx, null, outputRow)}
+       |   if (shouldStop()) return;
+       | }
+     """.stripMargin.trim
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
+    s"""
+       |${row.code}
+       |$sorterVariable.insertRow((UnsafeRow)${row.value});
+     """.stripMargin
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 64d89f2..e28e456 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -41,6 +41,8 @@ import org.apache.spark.util.ThreadUtils
 
 /**
  * The base class for physical operators.
+ *
+ * The naming convention is that physical operators end with "Exec" suffix, 
e.g. [[ProjectExec]].
  */
 abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with 
Serializable {
 
@@ -392,19 +394,19 @@ object SparkPlan {
     ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
 }
 
-private[sql] trait LeafNode extends SparkPlan {
+private[sql] trait LeafExecNode extends SparkPlan {
   override def children: Seq[SparkPlan] = Nil
   override def producedAttributes: AttributeSet = outputSet
 }
 
-object UnaryNode {
+object UnaryExecNode {
   def unapply(a: Any): Option[(SparkPlan, SparkPlan)] = a match {
     case s: SparkPlan if s.children.size == 1 => Some((s, s.children.head))
     case _ => None
   }
 }
 
-private[sql] trait UnaryNode extends SparkPlan {
+private[sql] trait UnaryExecNode extends SparkPlan {
   def child: SparkPlan
 
   override def children: Seq[SparkPlan] = child :: Nil
@@ -412,7 +414,7 @@ private[sql] trait UnaryNode extends SparkPlan {
   override def outputPartitioning: Partitioning = child.outputPartitioning
 }
 
-private[sql] trait BinaryNode extends SparkPlan {
+private[sql] trait BinaryExecNode extends SparkPlan {
   def left: SparkPlan
   def right: SparkPlan
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
index 247f55d..cb4b1cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.execution.exchange.ReusedExchange
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.metric.SQLMetricInfo
 import org.apache.spark.util.Utils
 
@@ -51,7 +51,7 @@ private[sql] object SparkPlanInfo {
 
   def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
     val children = plan match {
-      case ReusedExchange(_, child) => child :: Nil
+      case ReusedExchangeExec(_, child) => child :: Nil
       case _ => plan.children ++ plan.subqueries
     }
     val metrics = plan.metrics.toSeq.map { case (key, metric) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index 8d05ae4..0afa4c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -82,10 +82,10 @@ class SparkPlanner(
       // when the columns of this projection are enough to evaluate all filter 
conditions,
       // just do a scan followed by a filter, with no extra project.
       val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
-      filterCondition.map(Filter(_, scan)).getOrElse(scan)
+      filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
     } else {
       val scan = scanBuilder((projectSet ++ filterSet).toSeq)
-      Project(projectList, filterCondition.map(Filter(_, 
scan)).getOrElse(scan))
+      ProjectExec(projectList, filterCondition.map(FilterExec(_, 
scan)).getOrElse(scan))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ed6b846..3ce5f28 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution
-import org.apache.spark.sql.execution.columnar.{InMemoryColumnarTableScan, 
InMemoryRelation}
+import org.apache.spark.sql.execution.columnar.{InMemoryRelation, 
InMemoryTableScanExec}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
@@ -44,20 +44,22 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
     override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.ReturnAnswer(rootPlan) => rootPlan match {
         case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, 
child)) =>
-          execution.TakeOrderedAndProject(limit, order, None, 
planLater(child)) :: Nil
+          execution.TakeOrderedAndProjectExec(limit, order, None, 
planLater(child)) :: Nil
         case logical.Limit(
             IntegerLiteral(limit),
             logical.Project(projectList, logical.Sort(order, true, child))) =>
-          execution.TakeOrderedAndProject(limit, order, Some(projectList), 
planLater(child)) :: Nil
+          execution.TakeOrderedAndProjectExec(
+            limit, order, Some(projectList), planLater(child)) :: Nil
         case logical.Limit(IntegerLiteral(limit), child) =>
-          execution.CollectLimit(limit, planLater(child)) :: Nil
+          execution.CollectLimitExec(limit, planLater(child)) :: Nil
         case other => planLater(other) :: Nil
       }
       case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, 
child)) =>
-        execution.TakeOrderedAndProject(limit, order, None, planLater(child)) 
:: Nil
+        execution.TakeOrderedAndProjectExec(limit, order, None, 
planLater(child)) :: Nil
       case logical.Limit(
           IntegerLiteral(limit), logical.Project(projectList, 
logical.Sort(order, true, child))) =>
-        execution.TakeOrderedAndProject(limit, order, Some(projectList), 
planLater(child)) :: Nil
+        execution.TakeOrderedAndProjectExec(
+          limit, order, Some(projectList), planLater(child)) :: Nil
       case _ => Nil
     }
   }
@@ -66,12 +68,12 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case ExtractEquiJoinKeys(
              LeftExistence(jt), leftKeys, rightKeys, condition, left, 
CanBroadcast(right)) =>
-        Seq(joins.BroadcastHashJoin(
+        Seq(joins.BroadcastHashJoinExec(
           leftKeys, rightKeys, jt, BuildRight, condition, planLater(left), 
planLater(right)))
       // Find left semi joins where at least some predicates can be evaluated 
by matching join keys
       case ExtractEquiJoinKeys(
              LeftExistence(jt), leftKeys, rightKeys, condition, left, right) =>
-        Seq(joins.ShuffledHashJoin(
+        Seq(joins.ShuffledHashJoinExec(
           leftKeys, rightKeys, jt, BuildRight, condition, planLater(left), 
planLater(right)))
       case _ => Nil
     }
@@ -146,11 +148,11 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       // --- Inner joins 
--------------------------------------------------------------------------
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, 
CanBroadcast(right)) =>
-        Seq(joins.BroadcastHashJoin(
+        Seq(joins.BroadcastHashJoinExec(
           leftKeys, rightKeys, Inner, BuildRight, condition, planLater(left), 
planLater(right)))
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, 
CanBroadcast(left), right) =>
-        Seq(joins.BroadcastHashJoin(
+        Seq(joins.BroadcastHashJoinExec(
           leftKeys, rightKeys, Inner, BuildLeft, condition, planLater(left), 
planLater(right)))
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, 
right)
@@ -162,41 +164,41 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
           } else {
             BuildLeft
           }
-        Seq(joins.ShuffledHashJoin(
+        Seq(joins.ShuffledHashJoinExec(
           leftKeys, rightKeys, Inner, buildSide, condition, planLater(left), 
planLater(right)))
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, 
right)
         if RowOrdering.isOrderable(leftKeys) =>
-        joins.SortMergeJoin(
+        joins.SortMergeJoinExec(
           leftKeys, rightKeys, Inner, condition, planLater(left), 
planLater(right)) :: Nil
 
       // --- Outer joins 
--------------------------------------------------------------------------
 
       case ExtractEquiJoinKeys(
           LeftOuter, leftKeys, rightKeys, condition, left, 
CanBroadcast(right)) =>
-        Seq(joins.BroadcastHashJoin(
+        Seq(joins.BroadcastHashJoinExec(
           leftKeys, rightKeys, LeftOuter, BuildRight, condition, 
planLater(left), planLater(right)))
 
       case ExtractEquiJoinKeys(
           RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), 
right) =>
-        Seq(joins.BroadcastHashJoin(
+        Seq(joins.BroadcastHashJoinExec(
           leftKeys, rightKeys, RightOuter, BuildLeft, condition, 
planLater(left), planLater(right)))
 
       case ExtractEquiJoinKeys(LeftOuter, leftKeys, rightKeys, condition, 
left, right)
          if !conf.preferSortMergeJoin && canBuildHashMap(right) && 
muchSmaller(right, left) ||
            !RowOrdering.isOrderable(leftKeys) =>
-        Seq(joins.ShuffledHashJoin(
+        Seq(joins.ShuffledHashJoinExec(
           leftKeys, rightKeys, LeftOuter, BuildRight, condition, 
planLater(left), planLater(right)))
 
       case ExtractEquiJoinKeys(RightOuter, leftKeys, rightKeys, condition, 
left, right)
          if !conf.preferSortMergeJoin && canBuildHashMap(left) && 
muchSmaller(left, right) ||
            !RowOrdering.isOrderable(leftKeys) =>
-        Seq(joins.ShuffledHashJoin(
+        Seq(joins.ShuffledHashJoinExec(
           leftKeys, rightKeys, RightOuter, BuildLeft, condition, 
planLater(left), planLater(right)))
 
       case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, 
right)
         if RowOrdering.isOrderable(leftKeys) =>
-        joins.SortMergeJoin(
+        joins.SortMergeJoinExec(
           leftKeys, rightKeys, joinType, condition, planLater(left), 
planLater(right)) :: Nil
 
       // --- Cases where this strategy does not apply 
---------------------------------------------
@@ -278,10 +280,10 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   object BroadcastNestedLoop extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case j @ logical.Join(CanBroadcast(left), right, Inner | RightOuter, 
condition) =>
-        execution.joins.BroadcastNestedLoopJoin(
+        execution.joins.BroadcastNestedLoopJoinExec(
           planLater(left), planLater(right), joins.BuildLeft, j.joinType, 
condition) :: Nil
       case j @ logical.Join(left, CanBroadcast(right), Inner | LeftOuter | 
LeftSemi, condition) =>
-        execution.joins.BroadcastNestedLoopJoin(
+        execution.joins.BroadcastNestedLoopJoinExec(
           planLater(left), planLater(right), joins.BuildRight, j.joinType, 
condition) :: Nil
       case _ => Nil
     }
@@ -290,10 +292,10 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   object CartesianProduct extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.Join(left, right, Inner, None) =>
-        execution.joins.CartesianProduct(planLater(left), planLater(right)) :: 
Nil
+        execution.joins.CartesianProductExec(planLater(left), 
planLater(right)) :: Nil
       case logical.Join(left, right, Inner, Some(condition)) =>
-        execution.Filter(condition,
-          execution.joins.CartesianProduct(planLater(left), planLater(right))) 
:: Nil
+        execution.FilterExec(condition,
+          execution.joins.CartesianProductExec(planLater(left), 
planLater(right))) :: Nil
       case _ => Nil
     }
   }
@@ -308,7 +310,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
             joins.BuildLeft
           }
         // This join could be very slow or even hang forever
-        joins.BroadcastNestedLoopJoin(
+        joins.BroadcastNestedLoopJoinExec(
           planLater(left), planLater(right), buildSide, joinType, condition) 
:: Nil
       case _ => Nil
     }
@@ -323,7 +325,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
           projectList,
           filters,
           identity[Seq[Expression]], // All filters still need to be evaluated.
-          InMemoryColumnarTableScan(_, filters, mem)) :: Nil
+          InMemoryTableScanExec(_, filters, mem)) :: Nil
       case _ => Nil
     }
   }
@@ -333,11 +335,11 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
     def numPartitions: Int = self.numPartitions
 
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case r: RunnableCommand => ExecutedCommand(r) :: Nil
+      case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
 
       case MemoryPlan(sink, output) =>
         val encoder = RowEncoder(sink.schema)
-        LocalTableScan(output, sink.allData.map(r => encoder.toRow(r).copy())) 
:: Nil
+        LocalTableScanExec(output, sink.allData.map(r => 
encoder.toRow(r).copy())) :: Nil
 
       case logical.Distinct(child) =>
         throw new IllegalStateException(
@@ -349,19 +351,19 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case logical.DeserializeToObject(deserializer, objAttr, child) =>
         execution.DeserializeToObject(deserializer, objAttr, planLater(child)) 
:: Nil
       case logical.SerializeFromObject(serializer, child) =>
-        execution.SerializeFromObject(serializer, planLater(child)) :: Nil
+        execution.SerializeFromObjectExec(serializer, planLater(child)) :: Nil
       case logical.MapPartitions(f, objAttr, child) =>
-        execution.MapPartitions(f, objAttr, planLater(child)) :: Nil
+        execution.MapPartitionsExec(f, objAttr, planLater(child)) :: Nil
       case logical.MapElements(f, objAttr, child) =>
-        execution.MapElements(f, objAttr, planLater(child)) :: Nil
+        execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil
       case logical.AppendColumns(f, in, out, child) =>
-        execution.AppendColumns(f, in, out, planLater(child)) :: Nil
+        execution.AppendColumnsExec(f, in, out, planLater(child)) :: Nil
       case logical.AppendColumnsWithObject(f, childSer, newSer, child) =>
-        execution.AppendColumnsWithObject(f, childSer, newSer, 
planLater(child)) :: Nil
+        execution.AppendColumnsWithObjectExec(f, childSer, newSer, 
planLater(child)) :: Nil
       case logical.MapGroups(f, key, value, grouping, data, objAttr, child) =>
-        execution.MapGroups(f, key, value, grouping, data, objAttr, 
planLater(child)) :: Nil
+        execution.MapGroupsExec(f, key, value, grouping, data, objAttr, 
planLater(child)) :: Nil
       case logical.CoGroup(f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, 
oAttr, left, right) =>
-        execution.CoGroup(
+        execution.CoGroupExec(
           f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr,
           planLater(left), planLater(right)) :: Nil
 
@@ -369,45 +371,45 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         if (shuffle) {
           ShuffleExchange(RoundRobinPartitioning(numPartitions), 
planLater(child)) :: Nil
         } else {
-          execution.Coalesce(numPartitions, planLater(child)) :: Nil
+          execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
         }
       case logical.SortPartitions(sortExprs, child) =>
         // This sort only sorts tuples within a partition. Its 
requiredDistribution will be
         // an UnspecifiedDistribution.
-        execution.Sort(sortExprs, global = false, child = planLater(child)) :: 
Nil
+        execution.SortExec(sortExprs, global = false, child = 
planLater(child)) :: Nil
       case logical.Sort(sortExprs, global, child) =>
-        execution.Sort(sortExprs, global, planLater(child)) :: Nil
+        execution.SortExec(sortExprs, global, planLater(child)) :: Nil
       case logical.Project(projectList, child) =>
-        execution.Project(projectList, planLater(child)) :: Nil
+        execution.ProjectExec(projectList, planLater(child)) :: Nil
       case logical.Filter(condition, child) =>
-        execution.Filter(condition, planLater(child)) :: Nil
+        execution.FilterExec(condition, planLater(child)) :: Nil
       case e @ logical.Expand(_, _, child) =>
-        execution.Expand(e.projections, e.output, planLater(child)) :: Nil
+        execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil
       case logical.Window(windowExprs, partitionSpec, orderSpec, child) =>
-        execution.Window(windowExprs, partitionSpec, orderSpec, 
planLater(child)) :: Nil
+        execution.WindowExec(windowExprs, partitionSpec, orderSpec, 
planLater(child)) :: Nil
       case logical.Sample(lb, ub, withReplacement, seed, child) =>
-        execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: 
Nil
+        execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) 
:: Nil
       case logical.LocalRelation(output, data) =>
-        LocalTableScan(output, data) :: Nil
+        LocalTableScanExec(output, data) :: Nil
       case logical.LocalLimit(IntegerLiteral(limit), child) =>
-        execution.LocalLimit(limit, planLater(child)) :: Nil
+        execution.LocalLimitExec(limit, planLater(child)) :: Nil
       case logical.GlobalLimit(IntegerLiteral(limit), child) =>
-        execution.GlobalLimit(limit, planLater(child)) :: Nil
+        execution.GlobalLimitExec(limit, planLater(child)) :: Nil
       case logical.Union(unionChildren) =>
-        execution.Union(unionChildren.map(planLater)) :: Nil
+        execution.UnionExec(unionChildren.map(planLater)) :: Nil
       case logical.Except(left, right) =>
-        execution.Except(planLater(left), planLater(right)) :: Nil
+        execution.ExceptExec(planLater(left), planLater(right)) :: Nil
       case g @ logical.Generate(generator, join, outer, _, _, child) =>
-        execution.Generate(
+        execution.GenerateExec(
           generator, join = join, outer = outer, g.output, planLater(child)) 
:: Nil
       case logical.OneRowRelation =>
-        execution.PhysicalRDD(Nil, singleRowRdd, "OneRowRelation") :: Nil
+        execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
       case r @ logical.Range(start, end, step, numSlices, output) =>
-        execution.Range(start, step, numSlices, r.numElements, output) :: Nil
+        execution.RangeExec(start, step, numSlices, r.numElements, output) :: 
Nil
       case logical.RepartitionByExpression(expressions, child, nPartitions) =>
         exchange.ShuffleExchange(HashPartitioning(
           expressions, nPartitions.getOrElse(numPartitions)), 
planLater(child)) :: Nil
-      case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "ExistingRDD") 
:: Nil
+      case LogicalRDD(output, rdd) => RDDScanExec(output, rdd, "ExistingRDD") 
:: Nil
       case BroadcastHint(child) => planLater(child) :: Nil
       case _ => Nil
     }
@@ -416,7 +418,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   object DDLStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case CreateTableUsing(tableIdent, userSpecifiedSchema, provider, true, 
opts, false, _) =>
-        ExecutedCommand(
+        ExecutedCommandExec(
           CreateTempTableUsing(
             tableIdent, userSpecifiedSchema, provider, opts)) :: Nil
       case c: CreateTableUsing if !c.temporary =>
@@ -430,15 +432,15 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case c: CreateTableUsingAsSelect if c.temporary =>
         val cmd = CreateTempTableUsingAsSelect(
           c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, 
c.child)
-        ExecutedCommand(cmd) :: Nil
+        ExecutedCommandExec(cmd) :: Nil
       case c: CreateTableUsingAsSelect if !c.temporary =>
         sys.error("Tables created with SQLContext must be TEMPORARY. Use a 
HiveContext instead.")
 
       case logical.ShowFunctions(db, pattern) =>
-        ExecutedCommand(ShowFunctions(db, pattern)) :: Nil
+        ExecutedCommandExec(ShowFunctions(db, pattern)) :: Nil
 
       case logical.DescribeFunction(function, extended) =>
-        ExecutedCommand(DescribeFunction(function, extended)) :: Nil
+        ExecutedCommandExec(DescribeFunction(function, extended)) :: Nil
 
       case _ => Nil
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to