Repository: spark
Updated Branches:
  refs/heads/master eff4aed1a -> 21c7539a5


[SPARK-18038][SQL] Move output partitioning definition from UnaryNodeExec to 
its children

## What changes were proposed in this pull request?

Jira : https://issues.apache.org/jira/browse/SPARK-18038

This was a suggestion by rxin over one of the dev list discussion : 
http://apache-spark-developers-list.1001551.n3.nabble.com/Project-not-preserving-child-partitioning-td19417.html

His words:

>> It would be better (safer) to move the output partitioning definition into 
>> each of the operator and remove it from UnaryExecNode.

With this PR, following is the output partitioning and ordering for all the 
impls of `UnaryExecNode`.

UnaryExecNode's impl | outputPartitioning | outputOrdering | comment
------------ | ------------- | ------------ | ------------
AppendColumnsExec | child's | Nil | child's ordering can be used
AppendColumnsWithObjectExec | child's | Nil | child's ordering can be used
BroadcastExchangeExec | BroadcastPartitioning | Nil | -
CoalesceExec | UnknownPartitioning | Nil | -
CollectLimitExec | SinglePartition | Nil | -
DebugExec | child's | Nil | child's ordering can be used
DeserializeToObjectExec | child's | Nil | child's ordering can be used
ExpandExec | UnknownPartitioning | Nil | -
FilterExec | child's | child's | -
FlatMapGroupsInRExec | child's | Nil | child's ordering can be used
GenerateExec | child's | Nil | need to dig more
GlobalLimitExec | child's | child's | -
HashAggregateExec | child's | Nil | -
InputAdapter | child's | child's | -
InsertIntoHiveTable | child's | Nil | terminal node, doesn't need partitioning
LocalLimitExec | child's | child's | -
MapElementsExec | child's | child's | -
MapGroupsExec | child's | Nil | child's ordering can be used
MapPartitionsExec | child's | Nil | child's ordering can be used
ProjectExec | child's | child's | -
SampleExec | child's | Nil | child's ordering can be used
ScriptTransformation | child's | Nil | child's ordering can be used
SerializeFromObjectExec | child's | Nil | child's ordering can be used
ShuffleExchange | custom | Nil | -
SortAggregateExec | child's | sort over grouped exprs | -
SortExec | child's | custom | -
StateStoreRestoreExec  | child's | Nil | child's ordering can be used
StateStoreSaveExec | child's | Nil | child's ordering can be used
SubqueryExec | child's | child's | -
TakeOrderedAndProjectExec | SinglePartition | custom | -
WholeStageCodegenExec | child's | child's | -
WindowExec | child's | child's | -

## How was this patch tested?

This does NOT change any existing functionality so relying on existing tests

Author: Tejas Patil <[email protected]>

Closes #15575 from tejasapatil/SPARK-18038_UnaryNodeExec_output_partitioning.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21c7539a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21c7539a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21c7539a

Branch: refs/heads/master
Commit: 21c7539a5274a7e77686d17a6261d56592b85c2d
Parents: eff4aed
Author: Tejas Patil <[email protected]>
Authored: Sun Oct 23 13:25:47 2016 +0200
Committer: Herman van Hovell <[email protected]>
Committed: Sun Oct 23 13:25:47 2016 +0200

----------------------------------------------------------------------
 .../apache/spark/sql/execution/GenerateExec.scala  |  3 +++
 .../org/apache/spark/sql/execution/SortExec.scala  |  6 +++++-
 .../org/apache/spark/sql/execution/SparkPlan.scala |  2 --
 .../sql/execution/WholeStageCodegenExec.scala      |  4 ++++
 .../execution/aggregate/HashAggregateExec.scala    |  2 ++
 .../execution/aggregate/SortAggregateExec.scala    |  4 +++-
 .../sql/execution/basicPhysicalOperators.scala     |  8 ++++++++
 .../apache/spark/sql/execution/debug/package.scala |  4 +++-
 .../org/apache/spark/sql/execution/limit.scala     | 16 +++++++++++-----
 .../org/apache/spark/sql/execution/objects.scala   | 17 +++++++++++++++++
 .../execution/streaming/StatefulAggregate.scala    |  6 ++++++
 .../spark/sql/execution/window/WindowExec.scala    |  2 ++
 .../apache/spark/sql/execution/ReferenceSort.scala |  2 ++
 .../sql/hive/execution/InsertIntoHiveTable.scala   |  4 +++-
 .../sql/hive/execution/ScriptTransformation.scala  |  3 +++
 .../hive/execution/ScriptTransformationSuite.scala |  4 ++++
 16 files changed, 76 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index 39189a2..2663129 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
@@ -60,6 +61,8 @@ case class GenerateExec(
 
   override def producedAttributes: AttributeSet = AttributeSet(output)
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   val boundGenerator = BindReferences.bindReference(generator, child.output)
 
   protected override def doExecute(): RDD[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index d8e0675..cc576bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
-import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
@@ -45,6 +45,10 @@ case class SortExec(
 
   override def outputOrdering: Seq[SortOrder] = sortOrder
 
+  // sort performed is local within a given partition so will retain
+  // child operator's partitioning
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   override def requiredChildDistribution: Seq[Distribution] =
     if (global) OrderedDistribution(sortOrder) :: Nil else 
UnspecifiedDistribution :: Nil
 

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 48d6ef6..24d0cff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -395,8 +395,6 @@ trait UnaryExecNode extends SparkPlan {
   def child: SparkPlan
 
   override final def children: Seq[SparkPlan] = child :: Nil
-
-  override def outputPartitioning: Partitioning = child.outputPartitioning
 }
 
 trait BinaryExecNode extends SparkPlan {

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 62bf6f4..6303483 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -218,7 +218,9 @@ trait CodegenSupport extends SparkPlan {
 case class InputAdapter(child: SparkPlan) extends UnaryExecNode with 
CodegenSupport {
 
   override def output: Seq[Attribute] = child.output
+
   override def outputPartitioning: Partitioning = child.outputPartitioning
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
   override def doExecute(): RDD[InternalRow] = {
@@ -292,7 +294,9 @@ object WholeStageCodegenExec {
 case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with 
CodegenSupport {
 
   override def output: Seq[Attribute] = child.output
+
   override def outputPartitioning: Partitioning = child.outputPartitioning
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
   override lazy val metrics = Map(

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 06199ef..4529ed0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -63,6 +63,8 @@ case class HashAggregateExec(
 
   override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   override def producedAttributes: AttributeSet =
     AttributeSet(aggregateAttributes) ++
     
AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
index 2a81a82..be3198b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.util.Utils
@@ -66,6 +66,8 @@ case class SortAggregateExec(
     groupingExpressions.map(SortOrder(_, Ascending)) :: Nil
   }
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   override def outputOrdering: Seq[SortOrder] = {
     groupingExpressions.map(SortOrder(_, Ascending))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index dd78a78..37d750e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -78,6 +78,8 @@ case class ProjectExec(projectList: Seq[NamedExpression], 
child: SparkPlan)
   }
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
 }
 
 
@@ -214,6 +216,8 @@ case class FilterExec(condition: Expression, child: 
SparkPlan)
   }
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
 }
 
 /**
@@ -234,6 +238,8 @@ case class SampleExec(
     child: SparkPlan) extends UnaryExecNode with CodegenSupport {
   override def output: Seq[Attribute] = child.output
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
 
@@ -517,7 +523,9 @@ case class SubqueryExec(name: String, child: SparkPlan) 
extends UnaryExecNode {
     "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect 
(ms)"))
 
   override def output: Seq[Attribute] = child.output
+
   override def outputPartitioning: Partitioning = child.outputPartitioning
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
   override def sameResult(o: SparkPlan): Boolean = o match {

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index dd9d837..0395c43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -27,8 +27,8 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, 
CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
 
 /**
@@ -162,6 +162,8 @@ package object debug {
       }
     }
 
+    override def outputPartitioning: Partitioning = child.outputPartitioning
+
     override def inputRDDs(): Seq[RDD[InternalRow]] = {
       child.asInstanceOf[CodegenSupport].inputRDDs()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 86a8770..9918ac3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.util.Utils
 
-
 /**
  * Take the first `limit` elements and collect them to a single partition.
  *
@@ -54,8 +53,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode
 trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
   val limit: Int
   override def output: Seq[Attribute] = child.output
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   protected override def doExecute(): RDD[InternalRow] = 
child.execute().mapPartitions { iter =>
     iter.take(limit)
   }
@@ -95,14 +93,22 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
  * Take the first `limit` elements of each child partition, but do not collect 
or shuffle them.
  */
 case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
 }
 
 /**
  * Take the first `limit` elements of the child's single output partition.
  */
 case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec 
{
+
   override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 }
 
 /**
@@ -122,8 +128,6 @@ case class TakeOrderedAndProjectExec(
     projectList.map(_.toAttribute)
   }
 
-  override def outputPartitioning: Partitioning = SinglePartition
-
   override def executeCollect(): Array[InternalRow] = {
     val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
     val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
@@ -160,6 +164,8 @@ case class TakeOrderedAndProjectExec(
 
   override def outputOrdering: Seq[SortOrder] = sortOrder
 
+  override def outputPartitioning: Partitioning = SinglePartition
+
   override def simpleString: String = {
     val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]")
     val outputString = Utils.truncatedString(output, "[", ",", "]")

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 2acc511..9df56bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -68,6 +68,8 @@ case class DeserializeToObjectExec(
     outputObjAttr: Attribute,
     child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with 
CodegenSupport {
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
     child.asInstanceOf[CodegenSupport].inputRDDs()
   }
@@ -102,6 +104,8 @@ case class SerializeFromObjectExec(
 
   override def output: Seq[Attribute] = serializer.map(_.toAttribute)
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
     child.asInstanceOf[CodegenSupport].inputRDDs()
   }
@@ -171,6 +175,8 @@ case class MapPartitionsExec(
     child: SparkPlan)
   extends ObjectConsumerExec with ObjectProducerExec {
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   override protected def doExecute(): RDD[InternalRow] = {
     child.execute().mapPartitionsInternal { iter =>
       val getObject = 
ObjectOperator.unwrapObjectFromRow(child.output.head.dataType)
@@ -231,6 +237,8 @@ case class MapElementsExec(
   }
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
 }
 
 /**
@@ -244,6 +252,8 @@ case class AppendColumnsExec(
 
   override def output: Seq[Attribute] = child.output ++ 
serializer.map(_.toAttribute)
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   private def newColumnSchema = serializer.map(_.toAttribute).toStructType
 
   override protected def doExecute(): RDD[InternalRow] = {
@@ -272,6 +282,8 @@ case class AppendColumnsWithObjectExec(
 
   override def output: Seq[Attribute] = (inputSerializer ++ 
newColumnsSerializer).map(_.toAttribute)
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   private def inputSchema = inputSerializer.map(_.toAttribute).toStructType
   private def newColumnSchema = 
newColumnsSerializer.map(_.toAttribute).toStructType
 
@@ -304,6 +316,8 @@ case class MapGroupsExec(
     outputObjAttr: Attribute,
     child: SparkPlan) extends UnaryExecNode with ObjectProducerExec {
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   override def requiredChildDistribution: Seq[Distribution] =
     ClusteredDistribution(groupingAttributes) :: Nil
 
@@ -347,6 +361,9 @@ case class FlatMapGroupsInRExec(
     child: SparkPlan) extends UnaryExecNode with ObjectProducerExec {
 
   override def output: Seq[Attribute] = outputObjAttr :: Nil
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
 
   override def requiredChildDistribution: Seq[Distribution] =

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index 587ea7d..ad8238f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.execution.streaming.state._
@@ -80,7 +81,10 @@ case class StateStoreRestoreExec(
         }
     }
   }
+
   override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
 }
 
 /**
@@ -116,6 +120,8 @@ case class StateStoreSaveExec(
 
   override def output: Seq[Attribute] = child.output
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   /**
    * Save all the rows to the state store, and return all the rows in the 
state store.
    * Note that this returns an iterator that pipelines the saving to store 
with downstream

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 1dd281e..80b87d5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -103,6 +103,8 @@ case class WindowExec(
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   /**
    * Create a bound ordering object for a given frame type and offset. A bound 
ordering object is
    * used to determine which input row lies within the frame boundaries of an 
output row.

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
index a19ea51..6abcb1f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
@@ -57,4 +57,6 @@ case class ReferenceSort(
   override def output: Seq[Attribute] = child.output
 
   override def outputOrdering: Seq[SortOrder] = sortOrder
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 53bb3b9..c3c4e29 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.execution
 import java.io.IOException
 import java.net.URI
 import java.text.SimpleDateFormat
-import java.util
 import java.util.{Date, Random}
 
 import scala.collection.JavaConverters._
@@ -36,6 +35,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
@@ -291,6 +291,8 @@ case class InsertIntoHiveTable(
     Seq.empty[InternalRow]
   }
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
 
   protected override def doExecute(): RDD[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 1025b8f..50855e4 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive.HiveInspectors
 import org.apache.spark.sql.hive.HiveShim._
@@ -61,6 +62,8 @@ case class ScriptTransformation(
 
   override def producedAttributes: AttributeSet = outputSet -- inputSet
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
   protected override def doExecute(): RDD[InternalRow] = {
     def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: 
Configuration)
       : Iterator[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/21c7539a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index a8e81d7..0e83776 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.types.StringType
@@ -135,5 +136,8 @@ private case class ExceptionInjectingOperator(child: 
SparkPlan) extends UnaryExe
       throw new IllegalArgumentException("intentional exception")
     }
   }
+
   override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to