[SPARK-14855][SQL] Add "Exec" suffix to physical operators ## What changes were proposed in this pull request? This patch adds "Exec" suffix to all physical operators. Before this patch, Spark's physical operators and logical operators are named the same (e.g. Project could be logical.Project or execution.Project), which caused small issues in code review and bigger issues in code refactoring.
## How was this patch tested? N/A Author: Reynold Xin <[email protected]> Closes #12617 from rxin/exec-node. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7d0cad0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7d0cad0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7d0cad0 Branch: refs/heads/master Commit: d7d0cad0ad7667c0e09ae01601ee0e4d0b09963c Parents: c431a76 Author: Reynold Xin <[email protected]> Authored: Fri Apr 22 17:43:56 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Fri Apr 22 17:43:56 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/trees/TreeNode.scala | 9 +- .../spark/sql/execution/ExistingRDD.scala | 24 +- .../org/apache/spark/sql/execution/Expand.scala | 202 ---- .../apache/spark/sql/execution/ExpandExec.scala | 202 ++++ .../apache/spark/sql/execution/Generate.scala | 103 -- .../spark/sql/execution/GenerateExec.scala | 103 ++ .../spark/sql/execution/LocalTableScan.scala | 58 - .../sql/execution/LocalTableScanExec.scala | 58 + .../spark/sql/execution/QueryExecution.scala | 6 +- .../org/apache/spark/sql/execution/Sort.scala | 183 ---- .../apache/spark/sql/execution/SortExec.scala | 183 ++++ .../apache/spark/sql/execution/SparkPlan.scala | 10 +- .../spark/sql/execution/SparkPlanInfo.scala | 4 +- .../spark/sql/execution/SparkPlanner.scala | 4 +- .../spark/sql/execution/SparkStrategies.scala | 110 +- .../spark/sql/execution/WholeStageCodegen.scala | 492 --------- .../sql/execution/WholeStageCodegenExec.scala | 492 +++++++++ .../org/apache/spark/sql/execution/Window.scala | 1008 ------------------ .../apache/spark/sql/execution/WindowExec.scala | 1008 ++++++++++++++++++ .../aggregate/SortBasedAggregate.scala | 111 -- .../aggregate/SortBasedAggregateExec.scala | 111 ++ .../execution/aggregate/TungstenAggregate.scala | 2 +- .../spark/sql/execution/aggregate/utils.scala | 10 +- .../spark/sql/execution/basicOperators.scala | 50 +- .../columnar/InMemoryColumnarTableScan.scala | 358 ------- .../columnar/InMemoryTableScanExec.scala | 358 +++++++ .../spark/sql/execution/command/commands.scala | 2 +- .../datasources/DataSourceStrategy.scala | 18 +- .../datasources/FileSourceStrategy.scala | 8 +- .../spark/sql/execution/debug/package.scala | 10 +- .../execution/exchange/BroadcastExchange.scala | 109 -- .../exchange/BroadcastExchangeExec.scala | 109 ++ .../execution/exchange/EnsureRequirements.scala | 4 +- .../spark/sql/execution/exchange/Exchange.scala | 9 +- .../sql/execution/joins/BroadcastHashJoin.scala | 401 ------- .../execution/joins/BroadcastHashJoinExec.scala | 401 +++++++ .../joins/BroadcastNestedLoopJoin.scala | 331 ------ .../joins/BroadcastNestedLoopJoinExec.scala | 331 ++++++ .../sql/execution/joins/CartesianProduct.scala | 103 -- .../execution/joins/CartesianProductExec.scala | 103 ++ .../sql/execution/joins/ShuffledHashJoin.scala | 81 -- .../execution/joins/ShuffledHashJoinExec.scala | 81 ++ .../sql/execution/joins/SortMergeJoin.scala | 964 ----------------- .../sql/execution/joins/SortMergeJoinExec.scala | 964 +++++++++++++++++ .../org/apache/spark/sql/execution/limit.scala | 19 +- .../apache/spark/sql/execution/objects.scala | 39 +- .../execution/python/BatchEvalPythonExec.scala | 149 +++ .../python/BatchPythonEvaluation.scala | 149 --- .../execution/python/ExtractPythonUDFs.scala | 4 +- .../streaming/IncrementalExecution.scala | 15 +- .../execution/streaming/StatefulAggregate.scala | 10 +- .../apache/spark/sql/execution/subquery.scala | 2 +- .../spark/sql/execution/ui/SparkPlanGraph.scala | 4 +- .../org/apache/spark/sql/CachedTableSuite.scala | 10 +- .../spark/sql/ColumnExpressionSuite.scala | 4 +- .../apache/spark/sql/DataFrameJoinSuite.scala | 6 +- .../org/apache/spark/sql/DataFrameSuite.scala | 10 +- .../scala/org/apache/spark/sql/JoinSuite.scala | 115 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 8 +- .../spark/sql/execution/ExchangeSuite.scala | 12 +- .../spark/sql/execution/PlannerSuite.scala | 36 +- .../spark/sql/execution/ReferenceSort.scala | 2 +- .../apache/spark/sql/execution/SortSuite.scala | 16 +- .../execution/TakeOrderedAndProjectSuite.scala | 21 +- .../sql/execution/WholeStageCodegenSuite.scala | 36 +- .../columnar/PartitionBatchPruningSuite.scala | 2 +- .../datasources/FileSourceStrategySuite.scala | 6 +- .../execution/joins/BroadcastJoinSuite.scala | 6 +- .../execution/joins/ExistenceJoinSuite.scala | 8 +- .../sql/execution/joins/InnerJoinSuite.scala | 14 +- .../sql/execution/joins/OuterJoinSuite.scala | 10 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 16 +- .../spark/sql/sources/FilteredScanSuite.scala | 2 +- .../spark/sql/sources/PrunedScanSuite.scala | 2 +- .../apache/spark/sql/test/SQLTestUtils.scala | 4 +- .../spark/sql/util/DataFrameCallbackSuite.scala | 4 +- .../thriftserver/HiveThriftServer2Suites.scala | 4 +- .../apache/spark/sql/hive/HiveStrategies.scala | 8 +- .../sql/hive/execution/HiveTableScan.scala | 166 --- .../sql/hive/execution/HiveTableScanExec.scala | 166 +++ .../hive/execution/InsertIntoHiveTable.scala | 4 +- .../hive/execution/ScriptTransformation.scala | 2 +- .../spark/sql/hive/CachedTableSuite.scala | 6 +- .../apache/spark/sql/hive/StatisticsSuite.scala | 12 +- .../sql/hive/execution/HiveComparisonTest.scala | 2 +- .../sql/hive/execution/HiveQuerySuite.scala | 4 +- .../hive/execution/HiveTypeCoercionSuite.scala | 4 +- .../spark/sql/hive/execution/PruningSuite.scala | 2 +- .../execution/ScriptTransformationSuite.scala | 4 +- .../apache/spark/sql/hive/parquetSuites.scala | 14 +- .../spark/sql/sources/BucketedReadSuite.scala | 10 +- .../sources/ParquetHadoopFsRelationSuite.scala | 4 +- .../sql/sources/hadoopFsRelationSuites.scala | 4 +- 93 files changed, 5241 insertions(+), 5204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 232ca43..3d0e016 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -408,8 +408,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } } - /** Returns the name of this type of TreeNode. Defaults to the class name. */ - def nodeName: String = getClass.getSimpleName + /** + * Returns the name of this type of TreeNode. Defaults to the class name. + * Note that we remove the "Exec" suffix for physical operators here. + */ + def nodeName: String = getClass.getSimpleName.replaceAll("Exec$", "") /** * The arguments that should be included in the arg string. Defaults to the `productIterator`. @@ -426,7 +429,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { case other => other :: Nil }.mkString(", ") - /** String representation of this node without any children */ + /** String representation of this node without any children. */ def simpleString: String = s"$nodeName $argString".trim override def toString: String = treeString http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 12d03a7..b3a197c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -100,10 +100,10 @@ private[sql] case class LogicalRDD( } /** Physical plan node for scanning data from an RDD. */ -private[sql] case class PhysicalRDD( +private[sql] case class RDDScanExec( output: Seq[Attribute], rdd: RDD[InternalRow], - override val nodeName: String) extends LeafNode { + override val nodeName: String) extends LeafExecNode { private[sql] override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) @@ -124,7 +124,7 @@ private[sql] case class PhysicalRDD( } } -private[sql] trait DataSourceScan extends LeafNode { +private[sql] trait DataSourceScanExec extends LeafExecNode { val rdd: RDD[InternalRow] val relation: BaseRelation @@ -132,19 +132,19 @@ private[sql] trait DataSourceScan extends LeafNode { // Ignore rdd when checking results override def sameResult(plan: SparkPlan): Boolean = plan match { - case other: DataSourceScan => relation == other.relation && metadata == other.metadata + case other: DataSourceScanExec => relation == other.relation && metadata == other.metadata case _ => false } } /** Physical plan node for scanning data from a relation. */ -private[sql] case class RowDataSourceScan( +private[sql] case class RowDataSourceScanExec( output: Seq[Attribute], rdd: RDD[InternalRow], @transient relation: BaseRelation, override val outputPartitioning: Partitioning, override val metadata: Map[String, String] = Map.empty) - extends DataSourceScan with CodegenSupport { + extends DataSourceScanExec with CodegenSupport { private[sql] override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) @@ -207,13 +207,13 @@ private[sql] case class RowDataSourceScan( } /** Physical plan node for scanning data from a batched relation. */ -private[sql] case class BatchedDataSourceScan( +private[sql] case class BatchedDataSourceScanExec( output: Seq[Attribute], rdd: RDD[InternalRow], @transient relation: BaseRelation, override val outputPartitioning: Partitioning, override val metadata: Map[String, String] = Map.empty) - extends DataSourceScan with CodegenSupport { + extends DataSourceScanExec with CodegenSupport { private[sql] override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), @@ -316,7 +316,7 @@ private[sql] case class BatchedDataSourceScan( } } -private[sql] object DataSourceScan { +private[sql] object DataSourceScanExec { // Metadata keys val INPUT_PATHS = "InputPaths" val PUSHED_FILTERS = "PushedFilters" @@ -325,7 +325,7 @@ private[sql] object DataSourceScan { output: Seq[Attribute], rdd: RDD[InternalRow], relation: BaseRelation, - metadata: Map[String, String] = Map.empty): DataSourceScan = { + metadata: Map[String, String] = Map.empty): DataSourceScanExec = { val outputPartitioning = { val bucketSpec = relation match { // TODO: this should be closer to bucket planning. @@ -349,9 +349,9 @@ private[sql] object DataSourceScan { relation match { case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sqlContext, relation.schema) => - BatchedDataSourceScan(output, rdd, relation, outputPartitioning, metadata) + BatchedDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata) case _ => - RowDataSourceScan(output, rdd, relation, outputPartitioning, metadata) + RowDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala deleted file mode 100644 index 3966af5..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} -import org.apache.spark.sql.execution.metric.SQLMetrics - -/** - * Apply the all of the GroupExpressions to every input row, hence we will get - * multiple output rows for a input row. - * @param projections The group of expressions, all of the group expressions should - * output the same schema specified bye the parameter `output` - * @param output The output Schema - * @param child Child operator - */ -case class Expand( - projections: Seq[Seq[Expression]], - output: Seq[Attribute], - child: SparkPlan) - extends UnaryNode with CodegenSupport { - - private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) - - // The GroupExpressions can output data with arbitrary partitioning, so set it - // as UNKNOWN partitioning - override def outputPartitioning: Partitioning = UnknownPartitioning(0) - - override def references: AttributeSet = - AttributeSet(projections.flatten.flatMap(_.references)) - - private[this] val projection = - (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output) - - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { - val numOutputRows = longMetric("numOutputRows") - - child.execute().mapPartitions { iter => - val groups = projections.map(projection).toArray - new Iterator[InternalRow] { - private[this] var result: InternalRow = _ - private[this] var idx = -1 // -1 means the initial state - private[this] var input: InternalRow = _ - - override final def hasNext: Boolean = (-1 < idx && idx < groups.length) || iter.hasNext - - override final def next(): InternalRow = { - if (idx <= 0) { - // in the initial (-1) or beginning(0) of a new input row, fetch the next input tuple - input = iter.next() - idx = 0 - } - - result = groups(idx)(input) - idx += 1 - - if (idx == groups.length && iter.hasNext) { - idx = 0 - } - - numOutputRows += 1 - result - } - } - } - } - - override def inputRDDs(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].inputRDDs() - } - - protected override def doProduce(ctx: CodegenContext): String = { - child.asInstanceOf[CodegenSupport].produce(ctx, this) - } - - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - /* - * When the projections list looks like: - * expr1A, exprB, expr1C - * expr2A, exprB, expr2C - * ... - * expr(N-1)A, exprB, expr(N-1)C - * - * i.e. column A and C have different values for each output row, but column B stays constant. - * - * The generated code looks something like (note that B is only computed once in declaration): - * - * // part 1: declare all the columns - * colA = ... - * colB = ... - * colC = ... - * - * // part 2: code that computes the columns - * for (row = 0; row < N; row++) { - * switch (row) { - * case 0: - * colA = ... - * colC = ... - * case 1: - * colA = ... - * colC = ... - * ... - * case N - 1: - * colA = ... - * colC = ... - * } - * // increment metrics and consume output values - * } - * - * We use a for loop here so we only includes one copy of the consume code and avoid code - * size explosion. - */ - - // Set input variables - ctx.currentVars = input - - // Tracks whether a column has the same output for all rows. - // Size of sameOutput array should equal N. - // If sameOutput(i) is true, then the i-th column has the same value for all output rows given - // an input row. - val sameOutput: Array[Boolean] = output.indices.map { colIndex => - projections.map(p => p(colIndex)).toSet.size == 1 - }.toArray - - // Part 1: declare variables for each column - // If a column has the same value for all output rows, then we also generate its computation - // right after declaration. Otherwise its value is computed in the part 2. - val outputColumns = output.indices.map { col => - val firstExpr = projections.head(col) - if (sameOutput(col)) { - // This column is the same across all output rows. Just generate code for it here. - BindReferences.bindReference(firstExpr, child.output).genCode(ctx) - } else { - val isNull = ctx.freshName("isNull") - val value = ctx.freshName("value") - val code = s""" - |boolean $isNull = true; - |${ctx.javaType(firstExpr.dataType)} $value = ${ctx.defaultValue(firstExpr.dataType)}; - """.stripMargin - ExprCode(code, isNull, value) - } - } - - // Part 2: switch/case statements - val cases = projections.zipWithIndex.map { case (exprs, row) => - var updateCode = "" - for (col <- exprs.indices) { - if (!sameOutput(col)) { - val ev = BindReferences.bindReference(exprs(col), child.output).genCode(ctx) - updateCode += - s""" - |${ev.code} - |${outputColumns(col).isNull} = ${ev.isNull}; - |${outputColumns(col).value} = ${ev.value}; - """.stripMargin - } - } - - s""" - |case $row: - | ${updateCode.trim} - | break; - """.stripMargin - } - - val numOutput = metricTerm(ctx, "numOutputRows") - val i = ctx.freshName("i") - // these column have to declared before the loop. - val evaluate = evaluateVariables(outputColumns) - ctx.copyResult = true - s""" - |$evaluate - |for (int $i = 0; $i < ${projections.length}; $i ++) { - | switch ($i) { - | ${cases.mkString("\n").trim} - | } - | $numOutput.add(1); - | ${consume(ctx, outputColumns)} - |} - """.stripMargin - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala new file mode 100644 index 0000000..7c47566 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.errors._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.metric.SQLMetrics + +/** + * Apply the all of the GroupExpressions to every input row, hence we will get + * multiple output rows for a input row. + * @param projections The group of expressions, all of the group expressions should + * output the same schema specified bye the parameter `output` + * @param output The output Schema + * @param child Child operator + */ +case class ExpandExec( + projections: Seq[Seq[Expression]], + output: Seq[Attribute], + child: SparkPlan) + extends UnaryExecNode with CodegenSupport { + + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + + // The GroupExpressions can output data with arbitrary partitioning, so set it + // as UNKNOWN partitioning + override def outputPartitioning: Partitioning = UnknownPartitioning(0) + + override def references: AttributeSet = + AttributeSet(projections.flatten.flatMap(_.references)) + + private[this] val projection = + (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output) + + protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { + val numOutputRows = longMetric("numOutputRows") + + child.execute().mapPartitions { iter => + val groups = projections.map(projection).toArray + new Iterator[InternalRow] { + private[this] var result: InternalRow = _ + private[this] var idx = -1 // -1 means the initial state + private[this] var input: InternalRow = _ + + override final def hasNext: Boolean = (-1 < idx && idx < groups.length) || iter.hasNext + + override final def next(): InternalRow = { + if (idx <= 0) { + // in the initial (-1) or beginning(0) of a new input row, fetch the next input tuple + input = iter.next() + idx = 0 + } + + result = groups(idx)(input) + idx += 1 + + if (idx == groups.length && iter.hasNext) { + idx = 0 + } + + numOutputRows += 1 + result + } + } + } + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() + } + + protected override def doProduce(ctx: CodegenContext): String = { + child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { + /* + * When the projections list looks like: + * expr1A, exprB, expr1C + * expr2A, exprB, expr2C + * ... + * expr(N-1)A, exprB, expr(N-1)C + * + * i.e. column A and C have different values for each output row, but column B stays constant. + * + * The generated code looks something like (note that B is only computed once in declaration): + * + * // part 1: declare all the columns + * colA = ... + * colB = ... + * colC = ... + * + * // part 2: code that computes the columns + * for (row = 0; row < N; row++) { + * switch (row) { + * case 0: + * colA = ... + * colC = ... + * case 1: + * colA = ... + * colC = ... + * ... + * case N - 1: + * colA = ... + * colC = ... + * } + * // increment metrics and consume output values + * } + * + * We use a for loop here so we only includes one copy of the consume code and avoid code + * size explosion. + */ + + // Set input variables + ctx.currentVars = input + + // Tracks whether a column has the same output for all rows. + // Size of sameOutput array should equal N. + // If sameOutput(i) is true, then the i-th column has the same value for all output rows given + // an input row. + val sameOutput: Array[Boolean] = output.indices.map { colIndex => + projections.map(p => p(colIndex)).toSet.size == 1 + }.toArray + + // Part 1: declare variables for each column + // If a column has the same value for all output rows, then we also generate its computation + // right after declaration. Otherwise its value is computed in the part 2. + val outputColumns = output.indices.map { col => + val firstExpr = projections.head(col) + if (sameOutput(col)) { + // This column is the same across all output rows. Just generate code for it here. + BindReferences.bindReference(firstExpr, child.output).genCode(ctx) + } else { + val isNull = ctx.freshName("isNull") + val value = ctx.freshName("value") + val code = s""" + |boolean $isNull = true; + |${ctx.javaType(firstExpr.dataType)} $value = ${ctx.defaultValue(firstExpr.dataType)}; + """.stripMargin + ExprCode(code, isNull, value) + } + } + + // Part 2: switch/case statements + val cases = projections.zipWithIndex.map { case (exprs, row) => + var updateCode = "" + for (col <- exprs.indices) { + if (!sameOutput(col)) { + val ev = BindReferences.bindReference(exprs(col), child.output).genCode(ctx) + updateCode += + s""" + |${ev.code} + |${outputColumns(col).isNull} = ${ev.isNull}; + |${outputColumns(col).value} = ${ev.value}; + """.stripMargin + } + } + + s""" + |case $row: + | ${updateCode.trim} + | break; + """.stripMargin + } + + val numOutput = metricTerm(ctx, "numOutputRows") + val i = ctx.freshName("i") + // these column have to declared before the loop. + val evaluate = evaluateVariables(outputColumns) + ctx.copyResult = true + s""" + |$evaluate + |for (int $i = 0; $i < ${projections.length}; $i ++) { + | switch ($i) { + | ${cases.mkString("\n").trim} + | } + | $numOutput.add(1); + | ${consume(ctx, outputColumns)} + |} + """.stripMargin + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala deleted file mode 100644 index 9938d21..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.metric.SQLMetrics - -/** - * For lazy computing, be sure the generator.terminate() called in the very last - * TODO reusing the CompletionIterator? - */ -private[execution] sealed case class LazyIterator(func: () => TraversableOnce[InternalRow]) - extends Iterator[InternalRow] { - - lazy val results = func().toIterator - override def hasNext: Boolean = results.hasNext - override def next(): InternalRow = results.next() -} - -/** - * Applies a [[Generator]] to a stream of input rows, combining the - * output of each into a new stream of rows. This operation is similar to a `flatMap` in functional - * programming with one important additional feature, which allows the input rows to be joined with - * their output. - * @param generator the generator expression - * @param join when true, each output row is implicitly joined with the input tuple that produced - * it. - * @param outer when true, each input row will be output at least once, even if the output of the - * given `generator` is empty. `outer` has no effect when `join` is false. - * @param output the output attributes of this node, which constructed in analysis phase, - * and we can not change it, as the parent node bound with it already. - */ -case class Generate( - generator: Generator, - join: Boolean, - outer: Boolean, - output: Seq[Attribute], - child: SparkPlan) - extends UnaryNode { - - private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) - - override def producedAttributes: AttributeSet = AttributeSet(output) - - val boundGenerator = BindReferences.bindReference(generator, child.output) - - protected override def doExecute(): RDD[InternalRow] = { - // boundGenerator.terminate() should be triggered after all of the rows in the partition - val rows = if (join) { - child.execute().mapPartitionsInternal { iter => - val generatorNullRow = new GenericInternalRow(generator.elementTypes.size) - val joinedRow = new JoinedRow - - iter.flatMap { row => - // we should always set the left (child output) - joinedRow.withLeft(row) - val outputRows = boundGenerator.eval(row) - if (outer && outputRows.isEmpty) { - joinedRow.withRight(generatorNullRow) :: Nil - } else { - outputRows.map(joinedRow.withRight) - } - } ++ LazyIterator(boundGenerator.terminate).map { row => - // we leave the left side as the last element of its child output - // keep it the same as Hive does - joinedRow.withRight(row) - } - } - } else { - child.execute().mapPartitionsInternal { iter => - iter.flatMap(boundGenerator.eval) ++ LazyIterator(boundGenerator.terminate) - } - } - - val numOutputRows = longMetric("numOutputRows") - rows.mapPartitionsInternal { iter => - val proj = UnsafeProjection.create(output, output) - iter.map { r => - numOutputRows += 1 - proj(r) - } - } - } -} - http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala new file mode 100644 index 0000000..10cfec3 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.metric.SQLMetrics + +/** + * For lazy computing, be sure the generator.terminate() called in the very last + * TODO reusing the CompletionIterator? + */ +private[execution] sealed case class LazyIterator(func: () => TraversableOnce[InternalRow]) + extends Iterator[InternalRow] { + + lazy val results = func().toIterator + override def hasNext: Boolean = results.hasNext + override def next(): InternalRow = results.next() +} + +/** + * Applies a [[Generator]] to a stream of input rows, combining the + * output of each into a new stream of rows. This operation is similar to a `flatMap` in functional + * programming with one important additional feature, which allows the input rows to be joined with + * their output. + * @param generator the generator expression + * @param join when true, each output row is implicitly joined with the input tuple that produced + * it. + * @param outer when true, each input row will be output at least once, even if the output of the + * given `generator` is empty. `outer` has no effect when `join` is false. + * @param output the output attributes of this node, which constructed in analysis phase, + * and we can not change it, as the parent node bound with it already. + */ +case class GenerateExec( + generator: Generator, + join: Boolean, + outer: Boolean, + output: Seq[Attribute], + child: SparkPlan) + extends UnaryExecNode { + + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + + override def producedAttributes: AttributeSet = AttributeSet(output) + + val boundGenerator = BindReferences.bindReference(generator, child.output) + + protected override def doExecute(): RDD[InternalRow] = { + // boundGenerator.terminate() should be triggered after all of the rows in the partition + val rows = if (join) { + child.execute().mapPartitionsInternal { iter => + val generatorNullRow = new GenericInternalRow(generator.elementTypes.size) + val joinedRow = new JoinedRow + + iter.flatMap { row => + // we should always set the left (child output) + joinedRow.withLeft(row) + val outputRows = boundGenerator.eval(row) + if (outer && outputRows.isEmpty) { + joinedRow.withRight(generatorNullRow) :: Nil + } else { + outputRows.map(joinedRow.withRight) + } + } ++ LazyIterator(boundGenerator.terminate).map { row => + // we leave the left side as the last element of its child output + // keep it the same as Hive does + joinedRow.withRight(row) + } + } + } else { + child.execute().mapPartitionsInternal { iter => + iter.flatMap(boundGenerator.eval) ++ LazyIterator(boundGenerator.terminate) + } + } + + val numOutputRows = longMetric("numOutputRows") + rows.mapPartitionsInternal { iter => + val proj = UnsafeProjection.create(output, output) + iter.map { r => + numOutputRows += 1 + proj(r) + } + } + } +} + http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala deleted file mode 100644 index f8aec9e..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} -import org.apache.spark.sql.execution.metric.SQLMetrics - - -/** - * Physical plan node for scanning data from a local collection. - */ -private[sql] case class LocalTableScan( - output: Seq[Attribute], - rows: Seq[InternalRow]) extends LeafNode { - - private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) - - private val unsafeRows: Array[InternalRow] = { - val proj = UnsafeProjection.create(output, output) - rows.map(r => proj(r).copy()).toArray - } - - private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows) - - protected override def doExecute(): RDD[InternalRow] = { - val numOutputRows = longMetric("numOutputRows") - rdd.map { r => - numOutputRows += 1 - r - } - } - - override def executeCollect(): Array[InternalRow] = { - unsafeRows - } - - override def executeTake(limit: Int): Array[InternalRow] = { - unsafeRows.take(limit) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala new file mode 100644 index 0000000..4ab447a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} +import org.apache.spark.sql.execution.metric.SQLMetrics + + +/** + * Physical plan node for scanning data from a local collection. + */ +private[sql] case class LocalTableScanExec( + output: Seq[Attribute], + rows: Seq[InternalRow]) extends LeafExecNode { + + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + + private val unsafeRows: Array[InternalRow] = { + val proj = UnsafeProjection.create(output, output) + rows.map(r => proj(r).copy()).toArray + } + + private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows) + + protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + rdd.map { r => + numOutputRows += 1 + r + } + } + + override def executeCollect(): Array[InternalRow] = { + unsafeRows + } + + override def executeTake(limit: Int): Array[InternalRow] = { + unsafeRows.take(limit) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index a444a70..bb83676 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommand, HiveNativeCommand} +import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, HiveNativeCommand} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _} @@ -107,7 +107,7 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { * execution is simply passed back to Hive. */ def hiveResultString(): Seq[String] = executedPlan match { - case ExecutedCommand(desc: DescribeTableCommand) => + case ExecutedCommandExec(desc: DescribeTableCommand) => // If it is a describe command for a Hive table, we want to have the output format // be similar with Hive. desc.run(sqlContext).map { @@ -117,7 +117,7 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { .map(s => String.format(s"%-20s", s)) .mkString("\t") } - case command: ExecutedCommand => + case command: ExecutedCommandExec => command.executeCollect().map(_.getString(0)) case other => http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala deleted file mode 100644 index 04a39a1..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.executor.TaskMetrics -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, GenerateUnsafeProjection} -import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution} -import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types._ -import org.apache.spark.util.collection.unsafe.sort.RadixSort; - -/** - * Performs (external) sorting. - * - * @param global when true performs a global sort of all partitions by shuffling the data first - * if necessary. - * @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will - * spill every `frequency` records. - */ -case class Sort( - sortOrder: Seq[SortOrder], - global: Boolean, - child: SparkPlan, - testSpillFrequency: Int = 0) - extends UnaryNode with CodegenSupport { - - override def output: Seq[Attribute] = child.output - - override def outputOrdering: Seq[SortOrder] = sortOrder - - override def requiredChildDistribution: Seq[Distribution] = - if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil - - private val enableRadixSort = sqlContext.conf.enableRadixSort - - override private[sql] lazy val metrics = Map( - "sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"), - "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), - "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) - - def createSorter(): UnsafeExternalRowSorter = { - val ordering = newOrdering(sortOrder, output) - - // The comparator for comparing prefix - val boundSortExpression = BindReferences.bindReference(sortOrder.head, output) - val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression) - - val canUseRadixSort = enableRadixSort && sortOrder.length == 1 && - SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression) - - // The generator for prefix - val prefixProjection = UnsafeProjection.create(Seq(SortPrefix(boundSortExpression))) - val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { - override def computePrefix(row: InternalRow): Long = { - prefixProjection.apply(row).getLong(0) - } - } - - val pageSize = SparkEnv.get.memoryManager.pageSizeBytes - val sorter = new UnsafeExternalRowSorter( - schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort) - - if (testSpillFrequency > 0) { - sorter.setTestSpillFrequency(testSpillFrequency) - } - sorter - } - - protected override def doExecute(): RDD[InternalRow] = { - val peakMemory = longMetric("peakMemory") - val spillSize = longMetric("spillSize") - val sortTime = longMetric("sortTime") - - child.execute().mapPartitionsInternal { iter => - val sorter = createSorter() - - val metrics = TaskContext.get().taskMetrics() - // Remember spill data size of this task before execute this operator so that we can - // figure out how many bytes we spilled for this operator. - val spillSizeBefore = metrics.memoryBytesSpilled - val beforeSort = System.nanoTime() - - val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) - - sortTime += (System.nanoTime() - beforeSort) / 1000000 - peakMemory += sorter.getPeakMemoryUsage - spillSize += metrics.memoryBytesSpilled - spillSizeBefore - metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage) - - sortedIterator - } - } - - override def usedInputs: AttributeSet = AttributeSet(Seq.empty) - - override def inputRDDs(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].inputRDDs() - } - - // Name of sorter variable used in codegen. - private var sorterVariable: String = _ - - override protected def doProduce(ctx: CodegenContext): String = { - val needToSort = ctx.freshName("needToSort") - ctx.addMutableState("boolean", needToSort, s"$needToSort = true;") - - // Initialize the class member variables. This includes the instance of the Sorter and - // the iterator to return sorted rows. - val thisPlan = ctx.addReferenceObj("plan", this) - sorterVariable = ctx.freshName("sorter") - ctx.addMutableState(classOf[UnsafeExternalRowSorter].getName, sorterVariable, - s"$sorterVariable = $thisPlan.createSorter();") - val metrics = ctx.freshName("metrics") - ctx.addMutableState(classOf[TaskMetrics].getName, metrics, - s"$metrics = org.apache.spark.TaskContext.get().taskMetrics();") - val sortedIterator = ctx.freshName("sortedIter") - ctx.addMutableState("scala.collection.Iterator<UnsafeRow>", sortedIterator, "") - - val addToSorter = ctx.freshName("addToSorter") - ctx.addNewFunction(addToSorter, - s""" - | private void $addToSorter() throws java.io.IOException { - | ${child.asInstanceOf[CodegenSupport].produce(ctx, this)} - | } - """.stripMargin.trim) - - // The child could change `copyResult` to true, but we had already consumed all the rows, - // so `copyResult` should be reset to `false`. - ctx.copyResult = false - - val outputRow = ctx.freshName("outputRow") - val peakMemory = metricTerm(ctx, "peakMemory") - val spillSize = metricTerm(ctx, "spillSize") - val spillSizeBefore = ctx.freshName("spillSizeBefore") - val startTime = ctx.freshName("startTime") - val sortTime = metricTerm(ctx, "sortTime") - s""" - | if ($needToSort) { - | long $spillSizeBefore = $metrics.memoryBytesSpilled(); - | long $startTime = System.nanoTime(); - | $addToSorter(); - | $sortedIterator = $sorterVariable.sort(); - | $sortTime.add((System.nanoTime() - $startTime) / 1000000); - | $peakMemory.add($sorterVariable.getPeakMemoryUsage()); - | $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore); - | $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage()); - | $needToSort = false; - | } - | - | while ($sortedIterator.hasNext()) { - | UnsafeRow $outputRow = (UnsafeRow)$sortedIterator.next(); - | ${consume(ctx, null, outputRow)} - | if (shouldStop()) return; - | } - """.stripMargin.trim - } - - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - s""" - |${row.code} - |$sorterVariable.insertRow((UnsafeRow)${row.value}); - """.stripMargin - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala new file mode 100644 index 0000000..0e4d6d7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, GenerateUnsafeProjection} +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution} +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.types._ +import org.apache.spark.util.collection.unsafe.sort.RadixSort; + +/** + * Performs (external) sorting. + * + * @param global when true performs a global sort of all partitions by shuffling the data first + * if necessary. + * @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will + * spill every `frequency` records. + */ +case class SortExec( + sortOrder: Seq[SortOrder], + global: Boolean, + child: SparkPlan, + testSpillFrequency: Int = 0) + extends UnaryExecNode with CodegenSupport { + + override def output: Seq[Attribute] = child.output + + override def outputOrdering: Seq[SortOrder] = sortOrder + + override def requiredChildDistribution: Seq[Distribution] = + if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil + + private val enableRadixSort = sqlContext.conf.enableRadixSort + + override private[sql] lazy val metrics = Map( + "sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"), + "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) + + def createSorter(): UnsafeExternalRowSorter = { + val ordering = newOrdering(sortOrder, output) + + // The comparator for comparing prefix + val boundSortExpression = BindReferences.bindReference(sortOrder.head, output) + val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression) + + val canUseRadixSort = enableRadixSort && sortOrder.length == 1 && + SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression) + + // The generator for prefix + val prefixProjection = UnsafeProjection.create(Seq(SortPrefix(boundSortExpression))) + val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { + override def computePrefix(row: InternalRow): Long = { + prefixProjection.apply(row).getLong(0) + } + } + + val pageSize = SparkEnv.get.memoryManager.pageSizeBytes + val sorter = new UnsafeExternalRowSorter( + schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort) + + if (testSpillFrequency > 0) { + sorter.setTestSpillFrequency(testSpillFrequency) + } + sorter + } + + protected override def doExecute(): RDD[InternalRow] = { + val peakMemory = longMetric("peakMemory") + val spillSize = longMetric("spillSize") + val sortTime = longMetric("sortTime") + + child.execute().mapPartitionsInternal { iter => + val sorter = createSorter() + + val metrics = TaskContext.get().taskMetrics() + // Remember spill data size of this task before execute this operator so that we can + // figure out how many bytes we spilled for this operator. + val spillSizeBefore = metrics.memoryBytesSpilled + val beforeSort = System.nanoTime() + + val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) + + sortTime += (System.nanoTime() - beforeSort) / 1000000 + peakMemory += sorter.getPeakMemoryUsage + spillSize += metrics.memoryBytesSpilled - spillSizeBefore + metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage) + + sortedIterator + } + } + + override def usedInputs: AttributeSet = AttributeSet(Seq.empty) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() + } + + // Name of sorter variable used in codegen. + private var sorterVariable: String = _ + + override protected def doProduce(ctx: CodegenContext): String = { + val needToSort = ctx.freshName("needToSort") + ctx.addMutableState("boolean", needToSort, s"$needToSort = true;") + + // Initialize the class member variables. This includes the instance of the Sorter and + // the iterator to return sorted rows. + val thisPlan = ctx.addReferenceObj("plan", this) + sorterVariable = ctx.freshName("sorter") + ctx.addMutableState(classOf[UnsafeExternalRowSorter].getName, sorterVariable, + s"$sorterVariable = $thisPlan.createSorter();") + val metrics = ctx.freshName("metrics") + ctx.addMutableState(classOf[TaskMetrics].getName, metrics, + s"$metrics = org.apache.spark.TaskContext.get().taskMetrics();") + val sortedIterator = ctx.freshName("sortedIter") + ctx.addMutableState("scala.collection.Iterator<UnsafeRow>", sortedIterator, "") + + val addToSorter = ctx.freshName("addToSorter") + ctx.addNewFunction(addToSorter, + s""" + | private void $addToSorter() throws java.io.IOException { + | ${child.asInstanceOf[CodegenSupport].produce(ctx, this)} + | } + """.stripMargin.trim) + + // The child could change `copyResult` to true, but we had already consumed all the rows, + // so `copyResult` should be reset to `false`. + ctx.copyResult = false + + val outputRow = ctx.freshName("outputRow") + val peakMemory = metricTerm(ctx, "peakMemory") + val spillSize = metricTerm(ctx, "spillSize") + val spillSizeBefore = ctx.freshName("spillSizeBefore") + val startTime = ctx.freshName("startTime") + val sortTime = metricTerm(ctx, "sortTime") + s""" + | if ($needToSort) { + | long $spillSizeBefore = $metrics.memoryBytesSpilled(); + | long $startTime = System.nanoTime(); + | $addToSorter(); + | $sortedIterator = $sorterVariable.sort(); + | $sortTime.add((System.nanoTime() - $startTime) / 1000000); + | $peakMemory.add($sorterVariable.getPeakMemoryUsage()); + | $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore); + | $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage()); + | $needToSort = false; + | } + | + | while ($sortedIterator.hasNext()) { + | UnsafeRow $outputRow = (UnsafeRow)$sortedIterator.next(); + | ${consume(ctx, null, outputRow)} + | if (shouldStop()) return; + | } + """.stripMargin.trim + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { + s""" + |${row.code} + |$sorterVariable.insertRow((UnsafeRow)${row.value}); + """.stripMargin + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 64d89f2..e28e456 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -41,6 +41,8 @@ import org.apache.spark.util.ThreadUtils /** * The base class for physical operators. + * + * The naming convention is that physical operators end with "Exec" suffix, e.g. [[ProjectExec]]. */ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable { @@ -392,19 +394,19 @@ object SparkPlan { ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) } -private[sql] trait LeafNode extends SparkPlan { +private[sql] trait LeafExecNode extends SparkPlan { override def children: Seq[SparkPlan] = Nil override def producedAttributes: AttributeSet = outputSet } -object UnaryNode { +object UnaryExecNode { def unapply(a: Any): Option[(SparkPlan, SparkPlan)] = a match { case s: SparkPlan if s.children.size == 1 => Some((s, s.children.head)) case _ => None } } -private[sql] trait UnaryNode extends SparkPlan { +private[sql] trait UnaryExecNode extends SparkPlan { def child: SparkPlan override def children: Seq[SparkPlan] = child :: Nil @@ -412,7 +414,7 @@ private[sql] trait UnaryNode extends SparkPlan { override def outputPartitioning: Partitioning = child.outputPartitioning } -private[sql] trait BinaryNode extends SparkPlan { +private[sql] trait BinaryExecNode extends SparkPlan { def left: SparkPlan def right: SparkPlan http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 247f55d..cb4b1cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.execution.exchange.ReusedExchange +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo import org.apache.spark.util.Utils @@ -51,7 +51,7 @@ private[sql] object SparkPlanInfo { def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { val children = plan match { - case ReusedExchange(_, child) => child :: Nil + case ReusedExchangeExec(_, child) => child :: Nil case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 8d05ae4..0afa4c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -82,10 +82,10 @@ class SparkPlanner( // when the columns of this projection are enough to evaluate all filter conditions, // just do a scan followed by a filter, with no extra project. val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]]) - filterCondition.map(Filter(_, scan)).getOrElse(scan) + filterCondition.map(FilterExec(_, scan)).getOrElse(scan) } else { val scan = scanBuilder((projectSet ++ filterSet).toSeq) - Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan)) + ProjectExec(projectList, filterCondition.map(FilterExec(_, scan)).getOrElse(scan)) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ed6b846..3ce5f28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution -import org.apache.spark.sql.execution.columnar.{InMemoryColumnarTableScan, InMemoryRelation} +import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.exchange.ShuffleExchange @@ -44,20 +44,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ReturnAnswer(rootPlan) => rootPlan match { case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => - execution.TakeOrderedAndProject(limit, order, None, planLater(child)) :: Nil + execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil case logical.Limit( IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) => - execution.TakeOrderedAndProject(limit, order, Some(projectList), planLater(child)) :: Nil + execution.TakeOrderedAndProjectExec( + limit, order, Some(projectList), planLater(child)) :: Nil case logical.Limit(IntegerLiteral(limit), child) => - execution.CollectLimit(limit, planLater(child)) :: Nil + execution.CollectLimitExec(limit, planLater(child)) :: Nil case other => planLater(other) :: Nil } case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => - execution.TakeOrderedAndProject(limit, order, None, planLater(child)) :: Nil + execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil case logical.Limit( IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) => - execution.TakeOrderedAndProject(limit, order, Some(projectList), planLater(child)) :: Nil + execution.TakeOrderedAndProjectExec( + limit, order, Some(projectList), planLater(child)) :: Nil case _ => Nil } } @@ -66,12 +68,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ExtractEquiJoinKeys( LeftExistence(jt), leftKeys, rightKeys, condition, left, CanBroadcast(right)) => - Seq(joins.BroadcastHashJoin( + Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, jt, BuildRight, condition, planLater(left), planLater(right))) // Find left semi joins where at least some predicates can be evaluated by matching join keys case ExtractEquiJoinKeys( LeftExistence(jt), leftKeys, rightKeys, condition, left, right) => - Seq(joins.ShuffledHashJoin( + Seq(joins.ShuffledHashJoinExec( leftKeys, rightKeys, jt, BuildRight, condition, planLater(left), planLater(right))) case _ => Nil } @@ -146,11 +148,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // --- Inner joins -------------------------------------------------------------------------- case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => - Seq(joins.BroadcastHashJoin( + Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, Inner, BuildRight, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) => - Seq(joins.BroadcastHashJoin( + Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, Inner, BuildLeft, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) @@ -162,41 +164,41 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } else { BuildLeft } - Seq(joins.ShuffledHashJoin( + Seq(joins.ShuffledHashJoinExec( leftKeys, rightKeys, Inner, buildSide, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => - joins.SortMergeJoin( + joins.SortMergeJoinExec( leftKeys, rightKeys, Inner, condition, planLater(left), planLater(right)) :: Nil // --- Outer joins -------------------------------------------------------------------------- case ExtractEquiJoinKeys( LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => - Seq(joins.BroadcastHashJoin( + Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, LeftOuter, BuildRight, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys( RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), right) => - Seq(joins.BroadcastHashJoin( + Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, RightOuter, BuildLeft, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys(LeftOuter, leftKeys, rightKeys, condition, left, right) if !conf.preferSortMergeJoin && canBuildHashMap(right) && muchSmaller(right, left) || !RowOrdering.isOrderable(leftKeys) => - Seq(joins.ShuffledHashJoin( + Seq(joins.ShuffledHashJoinExec( leftKeys, rightKeys, LeftOuter, BuildRight, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys(RightOuter, leftKeys, rightKeys, condition, left, right) if !conf.preferSortMergeJoin && canBuildHashMap(left) && muchSmaller(left, right) || !RowOrdering.isOrderable(leftKeys) => - Seq(joins.ShuffledHashJoin( + Seq(joins.ShuffledHashJoinExec( leftKeys, rightKeys, RightOuter, BuildLeft, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => - joins.SortMergeJoin( + joins.SortMergeJoinExec( leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil // --- Cases where this strategy does not apply --------------------------------------------- @@ -278,10 +280,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object BroadcastNestedLoop extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case j @ logical.Join(CanBroadcast(left), right, Inner | RightOuter, condition) => - execution.joins.BroadcastNestedLoopJoin( + execution.joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), joins.BuildLeft, j.joinType, condition) :: Nil case j @ logical.Join(left, CanBroadcast(right), Inner | LeftOuter | LeftSemi, condition) => - execution.joins.BroadcastNestedLoopJoin( + execution.joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), joins.BuildRight, j.joinType, condition) :: Nil case _ => Nil } @@ -290,10 +292,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object CartesianProduct extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Join(left, right, Inner, None) => - execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil + execution.joins.CartesianProductExec(planLater(left), planLater(right)) :: Nil case logical.Join(left, right, Inner, Some(condition)) => - execution.Filter(condition, - execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil + execution.FilterExec(condition, + execution.joins.CartesianProductExec(planLater(left), planLater(right))) :: Nil case _ => Nil } } @@ -308,7 +310,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { joins.BuildLeft } // This join could be very slow or even hang forever - joins.BroadcastNestedLoopJoin( + joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition) :: Nil case _ => Nil } @@ -323,7 +325,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { projectList, filters, identity[Seq[Expression]], // All filters still need to be evaluated. - InMemoryColumnarTableScan(_, filters, mem)) :: Nil + InMemoryTableScanExec(_, filters, mem)) :: Nil case _ => Nil } } @@ -333,11 +335,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def numPartitions: Int = self.numPartitions def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case r: RunnableCommand => ExecutedCommand(r) :: Nil + case r: RunnableCommand => ExecutedCommandExec(r) :: Nil case MemoryPlan(sink, output) => val encoder = RowEncoder(sink.schema) - LocalTableScan(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil + LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil case logical.Distinct(child) => throw new IllegalStateException( @@ -349,19 +351,19 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.DeserializeToObject(deserializer, objAttr, child) => execution.DeserializeToObject(deserializer, objAttr, planLater(child)) :: Nil case logical.SerializeFromObject(serializer, child) => - execution.SerializeFromObject(serializer, planLater(child)) :: Nil + execution.SerializeFromObjectExec(serializer, planLater(child)) :: Nil case logical.MapPartitions(f, objAttr, child) => - execution.MapPartitions(f, objAttr, planLater(child)) :: Nil + execution.MapPartitionsExec(f, objAttr, planLater(child)) :: Nil case logical.MapElements(f, objAttr, child) => - execution.MapElements(f, objAttr, planLater(child)) :: Nil + execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil case logical.AppendColumns(f, in, out, child) => - execution.AppendColumns(f, in, out, planLater(child)) :: Nil + execution.AppendColumnsExec(f, in, out, planLater(child)) :: Nil case logical.AppendColumnsWithObject(f, childSer, newSer, child) => - execution.AppendColumnsWithObject(f, childSer, newSer, planLater(child)) :: Nil + execution.AppendColumnsWithObjectExec(f, childSer, newSer, planLater(child)) :: Nil case logical.MapGroups(f, key, value, grouping, data, objAttr, child) => - execution.MapGroups(f, key, value, grouping, data, objAttr, planLater(child)) :: Nil + execution.MapGroupsExec(f, key, value, grouping, data, objAttr, planLater(child)) :: Nil case logical.CoGroup(f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, left, right) => - execution.CoGroup( + execution.CoGroupExec( f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, planLater(left), planLater(right)) :: Nil @@ -369,45 +371,45 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if (shuffle) { ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil } else { - execution.Coalesce(numPartitions, planLater(child)) :: Nil + execution.CoalesceExec(numPartitions, planLater(child)) :: Nil } case logical.SortPartitions(sortExprs, child) => // This sort only sorts tuples within a partition. Its requiredDistribution will be // an UnspecifiedDistribution. - execution.Sort(sortExprs, global = false, child = planLater(child)) :: Nil + execution.SortExec(sortExprs, global = false, child = planLater(child)) :: Nil case logical.Sort(sortExprs, global, child) => - execution.Sort(sortExprs, global, planLater(child)) :: Nil + execution.SortExec(sortExprs, global, planLater(child)) :: Nil case logical.Project(projectList, child) => - execution.Project(projectList, planLater(child)) :: Nil + execution.ProjectExec(projectList, planLater(child)) :: Nil case logical.Filter(condition, child) => - execution.Filter(condition, planLater(child)) :: Nil + execution.FilterExec(condition, planLater(child)) :: Nil case e @ logical.Expand(_, _, child) => - execution.Expand(e.projections, e.output, planLater(child)) :: Nil + execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil case logical.Window(windowExprs, partitionSpec, orderSpec, child) => - execution.Window(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil + execution.WindowExec(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil case logical.Sample(lb, ub, withReplacement, seed, child) => - execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil + execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => - LocalTableScan(output, data) :: Nil + LocalTableScanExec(output, data) :: Nil case logical.LocalLimit(IntegerLiteral(limit), child) => - execution.LocalLimit(limit, planLater(child)) :: Nil + execution.LocalLimitExec(limit, planLater(child)) :: Nil case logical.GlobalLimit(IntegerLiteral(limit), child) => - execution.GlobalLimit(limit, planLater(child)) :: Nil + execution.GlobalLimitExec(limit, planLater(child)) :: Nil case logical.Union(unionChildren) => - execution.Union(unionChildren.map(planLater)) :: Nil + execution.UnionExec(unionChildren.map(planLater)) :: Nil case logical.Except(left, right) => - execution.Except(planLater(left), planLater(right)) :: Nil + execution.ExceptExec(planLater(left), planLater(right)) :: Nil case g @ logical.Generate(generator, join, outer, _, _, child) => - execution.Generate( + execution.GenerateExec( generator, join = join, outer = outer, g.output, planLater(child)) :: Nil case logical.OneRowRelation => - execution.PhysicalRDD(Nil, singleRowRdd, "OneRowRelation") :: Nil + execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil case r @ logical.Range(start, end, step, numSlices, output) => - execution.Range(start, step, numSlices, r.numElements, output) :: Nil + execution.RangeExec(start, step, numSlices, r.numElements, output) :: Nil case logical.RepartitionByExpression(expressions, child, nPartitions) => exchange.ShuffleExchange(HashPartitioning( expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil - case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "ExistingRDD") :: Nil + case LogicalRDD(output, rdd) => RDDScanExec(output, rdd, "ExistingRDD") :: Nil case BroadcastHint(child) => planLater(child) :: Nil case _ => Nil } @@ -416,7 +418,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case CreateTableUsing(tableIdent, userSpecifiedSchema, provider, true, opts, false, _) => - ExecutedCommand( + ExecutedCommandExec( CreateTempTableUsing( tableIdent, userSpecifiedSchema, provider, opts)) :: Nil case c: CreateTableUsing if !c.temporary => @@ -430,15 +432,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case c: CreateTableUsingAsSelect if c.temporary => val cmd = CreateTempTableUsingAsSelect( c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, c.child) - ExecutedCommand(cmd) :: Nil + ExecutedCommandExec(cmd) :: Nil case c: CreateTableUsingAsSelect if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") case logical.ShowFunctions(db, pattern) => - ExecutedCommand(ShowFunctions(db, pattern)) :: Nil + ExecutedCommandExec(ShowFunctions(db, pattern)) :: Nil case logical.DescribeFunction(function, extended) => - ExecutedCommand(DescribeFunction(function, extended)) :: Nil + ExecutedCommandExec(DescribeFunction(function, extended)) :: Nil case _ => Nil } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
