Repository: spark Updated Branches: refs/heads/master 7e191fe29 -> 3519b5e8e
[SPARK-2967][SQL] Follow-up: Also copy hash expressions in sort based shuffle fix. Follow-up to #2066 Author: Michael Armbrust <[email protected]> Closes #2072 from marmbrus/sortShuffle and squashes the following commits: 2ff8114 [Michael Armbrust] Fix bug Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3519b5e8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3519b5e8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3519b5e8 Branch: refs/heads/master Commit: 3519b5e8e55b4530d7f7c0bcab254f863dbfa814 Parents: 7e191fe Author: Michael Armbrust <[email protected]> Authored: Sat Aug 23 16:21:08 2014 -0700 Committer: Michael Armbrust <[email protected]> Committed: Sat Aug 23 16:21:08 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/execution/Exchange.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3519b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 09c34b7..4802e40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -46,12 +46,15 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. val rdd = child.execute().mapPartitions { iter => - @transient val hashExpressions = - newMutableProjection(expressions, child.output)() - if (sortBasedShuffleOn) { + @transient val hashExpressions = + newProjection(expressions, child.output) + iter.map(r => (hashExpressions(r), r.copy())) } else { + @transient val hashExpressions = + newMutableProjection(expressions, child.output)() + val mutablePair = new MutablePair[Row, Row]() iter.map(r => mutablePair.update(hashExpressions(r), r)) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
