Repository: spark Updated Branches: refs/heads/master e965a798d -> cc57d705e
[SPARK-9050] [SQL] Remove unused newOrdering argument from Exchange (cleanup after SPARK-8317) SPARK-8317 changed the SQL Exchange operator so that it no longer pushed sorting into Spark's shuffle layer, a change which allowed more efficient SQL-specific sorters to be used. This patch performs some leftover cleanup based on those changes: - Exchange's constructor should no longer accept a `newOrdering` since it's no longer used and no longer works as expected. - `addOperatorsIfNecessary` looked at shuffle input's output ordering to decide whether to sort, but this is the wrong node to be examining: it needs to look at whether the post-shuffle node has the right ordering, since shuffling will not preserve row orderings. Thanks to davies for spotting this. Author: Josh Rosen <[email protected]> Closes #7407 from JoshRosen/SPARK-9050 and squashes the following commits: e70be50 [Josh Rosen] No need to wrap line e866494 [Josh Rosen] Refactor addOperatorsIfNecessary to make code clearer 2e467da [Josh Rosen] Remove `newOrdering` from Exchange. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc57d705 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc57d705 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc57d705 Branch: refs/heads/master Commit: cc57d705e732aefc2f3d3f438e84d71705b2eb65 Parents: e965a79 Author: Josh Rosen <[email protected]> Authored: Tue Jul 14 18:55:34 2015 -0700 Committer: Josh Rosen <[email protected]> Committed: Tue Jul 14 18:55:34 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/execution/Exchange.scala | 37 ++++++++------------ .../spark/sql/execution/SparkStrategies.scala | 3 +- 2 files changed, 16 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cc57d705/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 4b783e3..feea4f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -35,21 +35,13 @@ import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEn /** * :: DeveloperApi :: - * Performs a shuffle that will result in the desired `newPartitioning`. Optionally sorts each - * resulting partition based on expressions from the partition key. It is invalid to construct an - * exchange operator with a `newOrdering` that cannot be calculated using the partitioning key. + * Performs a shuffle that will result in the desired `newPartitioning`. */ @DeveloperApi -case class Exchange( - newPartitioning: Partitioning, - newOrdering: Seq[SortOrder], - child: SparkPlan) - extends UnaryNode { +case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode { override def outputPartitioning: Partitioning = newPartitioning - override def outputOrdering: Seq[SortOrder] = newOrdering - override def output: Seq[Attribute] = child.output /** @@ -279,23 +271,24 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ partitioning: Partitioning, rowOrdering: Seq[SortOrder], child: SparkPlan): SparkPlan = { - val needSort = rowOrdering.nonEmpty && child.outputOrdering != rowOrdering - val needsShuffle = child.outputPartitioning != partitioning - val withShuffle = if (needsShuffle) { - Exchange(partitioning, Nil, child) - } else { - child + def addShuffleIfNecessary(child: SparkPlan): SparkPlan = { + if (child.outputPartitioning != partitioning) { + Exchange(partitioning, child) + } else { + child + } } - val withSort = if (needSort) { - sqlContext.planner.BasicOperators.getSortOperator( - rowOrdering, global = false, withShuffle) - } else { - withShuffle + def addSortIfNecessary(child: SparkPlan): SparkPlan = { + if (rowOrdering.nonEmpty && child.outputOrdering != rowOrdering) { + sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child) + } else { + child + } } - withSort + addSortIfNecessary(addShuffleIfNecessary(child)) } if (meetsRequirements && compatible && !needsAnySort) { http://git-wip-us.apache.org/repos/asf/spark/blob/cc57d705/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ce25af5..73b4634 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -360,8 +360,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.OneRowRelation => execution.PhysicalRDD(Nil, singleRowRdd) :: Nil case logical.RepartitionByExpression(expressions, child) => - execution.Exchange( - HashPartitioning(expressions, numPartitions), Nil, planLater(child)) :: Nil + execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil case e @ EvaluatePython(udf, child, _) => BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
