Repository: spark Updated Branches: refs/heads/master eff4aed1a -> 21c7539a5
[SPARK-18038][SQL] Move output partitioning definition from UnaryNodeExec to its children ## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-18038 This was a suggestion by rxin over one of the dev list discussion : http://apache-spark-developers-list.1001551.n3.nabble.com/Project-not-preserving-child-partitioning-td19417.html His words: >> It would be better (safer) to move the output partitioning definition into >> each of the operator and remove it from UnaryExecNode. With this PR, following is the output partitioning and ordering for all the impls of `UnaryExecNode`. UnaryExecNode's impl | outputPartitioning | outputOrdering | comment ------------ | ------------- | ------------ | ------------ AppendColumnsExec | child's | Nil | child's ordering can be used AppendColumnsWithObjectExec | child's | Nil | child's ordering can be used BroadcastExchangeExec | BroadcastPartitioning | Nil | - CoalesceExec | UnknownPartitioning | Nil | - CollectLimitExec | SinglePartition | Nil | - DebugExec | child's | Nil | child's ordering can be used DeserializeToObjectExec | child's | Nil | child's ordering can be used ExpandExec | UnknownPartitioning | Nil | - FilterExec | child's | child's | - FlatMapGroupsInRExec | child's | Nil | child's ordering can be used GenerateExec | child's | Nil | need to dig more GlobalLimitExec | child's | child's | - HashAggregateExec | child's | Nil | - InputAdapter | child's | child's | - InsertIntoHiveTable | child's | Nil | terminal node, doesn't need partitioning LocalLimitExec | child's | child's | - MapElementsExec | child's | child's | - MapGroupsExec | child's | Nil | child's ordering can be used MapPartitionsExec | child's | Nil | child's ordering can be used ProjectExec | child's | child's | - SampleExec | child's | Nil | child's ordering can be used ScriptTransformation | child's | Nil | child's ordering can be used SerializeFromObjectExec | child's | Nil | child's ordering can be used ShuffleExchange | custom | Nil | - SortAggregateExec | child's | sort over grouped exprs | - SortExec | child's | custom | - StateStoreRestoreExec | child's | Nil | child's ordering can be used StateStoreSaveExec | child's | Nil | child's ordering can be used SubqueryExec | child's | child's | - TakeOrderedAndProjectExec | SinglePartition | custom | - WholeStageCodegenExec | child's | child's | - WindowExec | child's | child's | - ## How was this patch tested? This does NOT change any existing functionality so relying on existing tests Author: Tejas Patil <[email protected]> Closes #15575 from tejasapatil/SPARK-18038_UnaryNodeExec_output_partitioning. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21c7539a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21c7539a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21c7539a Branch: refs/heads/master Commit: 21c7539a5274a7e77686d17a6261d56592b85c2d Parents: eff4aed Author: Tejas Patil <[email protected]> Authored: Sun Oct 23 13:25:47 2016 +0200 Committer: Herman van Hovell <[email protected]> Committed: Sun Oct 23 13:25:47 2016 +0200 ---------------------------------------------------------------------- .../apache/spark/sql/execution/GenerateExec.scala | 3 +++ .../org/apache/spark/sql/execution/SortExec.scala | 6 +++++- .../org/apache/spark/sql/execution/SparkPlan.scala | 2 -- .../sql/execution/WholeStageCodegenExec.scala | 4 ++++ .../execution/aggregate/HashAggregateExec.scala | 2 ++ .../execution/aggregate/SortAggregateExec.scala | 4 +++- .../sql/execution/basicPhysicalOperators.scala | 8 ++++++++ .../apache/spark/sql/execution/debug/package.scala | 4 +++- .../org/apache/spark/sql/execution/limit.scala | 16 +++++++++++----- .../org/apache/spark/sql/execution/objects.scala | 17 +++++++++++++++++ .../execution/streaming/StatefulAggregate.scala | 6 ++++++ .../spark/sql/execution/window/WindowExec.scala | 2 ++ .../apache/spark/sql/execution/ReferenceSort.scala | 2 ++ .../sql/hive/execution/InsertIntoHiveTable.scala | 4 +++- .../sql/hive/execution/ScriptTransformation.scala | 3 +++ .../hive/execution/ScriptTransformationSuite.scala | 4 ++++ 16 files changed, 76 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 39189a2..2663129 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -60,6 +61,8 @@ case class GenerateExec( override def producedAttributes: AttributeSet = AttributeSet(output) + override def outputPartitioning: Partitioning = child.outputPartitioning + val boundGenerator = BindReferences.bindReference(generator, child.output) protected override def doExecute(): RDD[InternalRow] = { http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index d8e0675..cc576bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -45,6 +45,10 @@ case class SortExec( override def outputOrdering: Seq[SortOrder] = sortOrder + // sort performed is local within a given partition so will retain + // child operator's partitioning + override def outputPartitioning: Partitioning = child.outputPartitioning + override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 48d6ef6..24d0cff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -395,8 +395,6 @@ trait UnaryExecNode extends SparkPlan { def child: SparkPlan override final def children: Seq[SparkPlan] = child :: Nil - - override def outputPartitioning: Partitioning = child.outputPartitioning } trait BinaryExecNode extends SparkPlan { http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 62bf6f4..6303483 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -218,7 +218,9 @@ trait CodegenSupport extends SparkPlan { case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport { override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def doExecute(): RDD[InternalRow] = { @@ -292,7 +294,9 @@ object WholeStageCodegenExec { case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = child.outputOrdering override lazy val metrics = Map( http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 06199ef..4529ed0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -63,6 +63,8 @@ case class HashAggregateExec( override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) + override def outputPartitioning: Partitioning = child.outputPartitioning + override def producedAttributes: AttributeSet = AttributeSet(aggregateAttributes) ++ AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++ http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index 2a81a82..be3198b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.util.Utils @@ -66,6 +66,8 @@ case class SortAggregateExec( groupingExpressions.map(SortOrder(_, Ascending)) :: Nil } + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = { groupingExpressions.map(SortOrder(_, Ascending)) } http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index dd78a78..37d750e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -78,6 +78,8 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) } override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning } @@ -214,6 +216,8 @@ case class FilterExec(condition: Expression, child: SparkPlan) } override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning } /** @@ -234,6 +238,8 @@ case class SampleExec( child: SparkPlan) extends UnaryExecNode with CodegenSupport { override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -517,7 +523,9 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)")) override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def sameResult(o: SparkPlan): Boolean = o match { http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index dd9d837..0395c43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -27,8 +27,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{AccumulatorV2, LongAccumulator} /** @@ -162,6 +162,8 @@ package object debug { } } + override def outputPartitioning: Partitioning = child.outputPartitioning + override def inputRDDs(): Seq[RDD[InternalRow]] = { child.asInstanceOf[CodegenSupport].inputRDDs() } http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 86a8770..9918ac3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.util.Utils - /** * Take the first `limit` elements and collect them to a single partition. * @@ -54,8 +53,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode trait BaseLimitExec extends UnaryExecNode with CodegenSupport { val limit: Int override def output: Seq[Attribute] = child.output - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => iter.take(limit) } @@ -95,14 +93,22 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { * Take the first `limit` elements of each child partition, but do not collect or shuffle them. */ case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning } /** * Take the first `limit` elements of the child's single output partition. */ case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering } /** @@ -122,8 +128,6 @@ case class TakeOrderedAndProjectExec( projectList.map(_.toAttribute) } - override def outputPartitioning: Partitioning = SinglePartition - override def executeCollect(): Array[InternalRow] = { val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) @@ -160,6 +164,8 @@ case class TakeOrderedAndProjectExec( override def outputOrdering: Seq[SortOrder] = sortOrder + override def outputPartitioning: Partitioning = SinglePartition + override def simpleString: String = { val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]") val outputString = Utils.truncatedString(output, "[", ",", "]") http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 2acc511..9df56bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -68,6 +68,8 @@ case class DeserializeToObjectExec( outputObjAttr: Attribute, child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with CodegenSupport { + override def outputPartitioning: Partitioning = child.outputPartitioning + override def inputRDDs(): Seq[RDD[InternalRow]] = { child.asInstanceOf[CodegenSupport].inputRDDs() } @@ -102,6 +104,8 @@ case class SerializeFromObjectExec( override def output: Seq[Attribute] = serializer.map(_.toAttribute) + override def outputPartitioning: Partitioning = child.outputPartitioning + override def inputRDDs(): Seq[RDD[InternalRow]] = { child.asInstanceOf[CodegenSupport].inputRDDs() } @@ -171,6 +175,8 @@ case class MapPartitionsExec( child: SparkPlan) extends ObjectConsumerExec with ObjectProducerExec { + override def outputPartitioning: Partitioning = child.outputPartitioning + override protected def doExecute(): RDD[InternalRow] = { child.execute().mapPartitionsInternal { iter => val getObject = ObjectOperator.unwrapObjectFromRow(child.output.head.dataType) @@ -231,6 +237,8 @@ case class MapElementsExec( } override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning } /** @@ -244,6 +252,8 @@ case class AppendColumnsExec( override def output: Seq[Attribute] = child.output ++ serializer.map(_.toAttribute) + override def outputPartitioning: Partitioning = child.outputPartitioning + private def newColumnSchema = serializer.map(_.toAttribute).toStructType override protected def doExecute(): RDD[InternalRow] = { @@ -272,6 +282,8 @@ case class AppendColumnsWithObjectExec( override def output: Seq[Attribute] = (inputSerializer ++ newColumnsSerializer).map(_.toAttribute) + override def outputPartitioning: Partitioning = child.outputPartitioning + private def inputSchema = inputSerializer.map(_.toAttribute).toStructType private def newColumnSchema = newColumnsSerializer.map(_.toAttribute).toStructType @@ -304,6 +316,8 @@ case class MapGroupsExec( outputObjAttr: Attribute, child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { + override def outputPartitioning: Partitioning = child.outputPartitioning + override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(groupingAttributes) :: Nil @@ -347,6 +361,9 @@ case class FlatMapGroupsInRExec( child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { override def output: Seq[Attribute] = outputObjAttr :: Nil + + override def outputPartitioning: Partitioning = child.outputPartitioning + override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) override def requiredChildDistribution: Seq[Distribution] = http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala index 587ea7d..ad8238f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.streaming.state._ @@ -80,7 +81,10 @@ case class StateStoreRestoreExec( } } } + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning } /** @@ -116,6 +120,8 @@ case class StateStoreSaveExec( override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + /** * Save all the rows to the state store, and return all the rows in the state store. * Note that this returns an iterator that pipelines the saving to store with downstream http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index 1dd281e..80b87d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -103,6 +103,8 @@ case class WindowExec( override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def outputPartitioning: Partitioning = child.outputPartitioning + /** * Create a bound ordering object for a given frame type and offset. A bound ordering object is * used to determine which input row lies within the frame boundaries of an output row. http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala index a19ea51..6abcb1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala @@ -57,4 +57,6 @@ case class ReferenceSort( override def output: Seq[Attribute] = child.output override def outputOrdering: Seq[SortOrder] = sortOrder + + override def outputPartitioning: Partitioning = child.outputPartitioning } http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 53bb3b9..c3c4e29 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.execution import java.io.IOException import java.net.URI import java.text.SimpleDateFormat -import java.util import java.util.{Date, Random} import scala.collection.JavaConverters._ @@ -36,6 +35,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} @@ -291,6 +291,8 @@ case class InsertIntoHiveTable( Seq.empty[InternalRow] } + override def outputPartitioning: Partitioning = child.outputPartitioning + override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray protected override def doExecute(): RDD[InternalRow] = { http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 1025b8f..50855e4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.HiveInspectors import org.apache.spark.sql.hive.HiveShim._ @@ -61,6 +62,8 @@ case class ScriptTransformation( override def producedAttributes: AttributeSet = outputSet -- inputSet + override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration) : Iterator[InternalRow] = { http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index a8e81d7..0e83776 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.types.StringType @@ -135,5 +136,8 @@ private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExe throw new IllegalArgumentException("intentional exception") } } + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
