Repository: spark
Updated Branches:
  refs/heads/branch-1.1 beb705a47 -> 311831db7


[SPARK-2967][SQL] Fix sort based shuffle for spark sql.

Add explicit row copies when sort based shuffle is on.

Author: Michael Armbrust <mich...@databricks.com>

Closes #2066 from marmbrus/sortShuffle and squashes the following commits:

fcd7bb2 [Michael Armbrust] Fix sort based shuffle for spark sql.

(cherry picked from commit a2e658dcdab614058eefcf50ae2d419ece9b1fe7)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/311831db
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/311831db
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/311831db

Branch: refs/heads/branch-1.1
Commit: 311831db71b742a0472d67a1127c818e5ba0a505
Parents: beb705a
Author: Michael Armbrust <mich...@databricks.com>
Authored: Wed Aug 20 15:51:14 2014 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Wed Aug 20 15:51:25 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/Exchange.scala   | 30 +++++++++++++++-----
 1 file changed, 23 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/311831db/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 77dc2ad..09c34b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf}
+import org.apache.spark.shuffle.sort.SortShuffleManager
+import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner, 
SparkConf}
 import org.apache.spark.rdd.ShuffledRDD
 import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.catalyst.errors.attachTree
@@ -37,6 +38,9 @@ case class Exchange(newPartitioning: Partitioning, child: 
SparkPlan) extends Una
 
   def output = child.output
 
+  /** We must copy rows when sort based shuffle is on */
+  protected def sortBasedShuffleOn = 
SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
+
   def execute() = attachTree(this , "execute") {
     newPartitioning match {
       case HashPartitioning(expressions, numPartitions) =>
@@ -45,8 +49,12 @@ case class Exchange(newPartitioning: Partitioning, child: 
SparkPlan) extends Una
           @transient val hashExpressions =
             newMutableProjection(expressions, child.output)()
 
-          val mutablePair = new MutablePair[Row, Row]()
-          iter.map(r => mutablePair.update(hashExpressions(r), r))
+          if (sortBasedShuffleOn) {
+            iter.map(r => (hashExpressions(r), r.copy()))
+          } else {
+            val mutablePair = new MutablePair[Row, Row]()
+            iter.map(r => mutablePair.update(hashExpressions(r), r))
+          }
         }
         val part = new HashPartitioner(numPartitions)
         val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
@@ -58,8 +66,12 @@ case class Exchange(newPartitioning: Partitioning, child: 
SparkPlan) extends Una
         implicit val ordering = new RowOrdering(sortingExpressions, 
child.output)
 
         val rdd = child.execute().mapPartitions { iter =>
-          val mutablePair = new MutablePair[Row, Null](null, null)
-          iter.map(row => mutablePair.update(row, null))
+          if (sortBasedShuffleOn) {
+            iter.map(row => (row.copy(), null))
+          } else {
+            val mutablePair = new MutablePair[Row, Null](null, null)
+            iter.map(row => mutablePair.update(row, null))
+          }
         }
         val part = new RangePartitioner(numPartitions, rdd, ascending = true)
         val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
@@ -69,8 +81,12 @@ case class Exchange(newPartitioning: Partitioning, child: 
SparkPlan) extends Una
 
       case SinglePartition =>
         val rdd = child.execute().mapPartitions { iter =>
-          val mutablePair = new MutablePair[Null, Row]()
-          iter.map(r => mutablePair.update(null, r))
+          if (sortBasedShuffleOn) {
+            iter.map(r => (null, r.copy()))
+          } else {
+            val mutablePair = new MutablePair[Null, Row]()
+            iter.map(r => mutablePair.update(null, r))
+          }
         }
         val partitioner = new HashPartitioner(1)
         val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to