Repository: spark
Updated Branches:
  refs/heads/master 39db1bfda -> e2614038e


[SPARK-3408] Fixed Limit operator so it works with sort-based shuffle.

Author: Reynold Xin <[email protected]>

Closes #2281 from rxin/sql-limit-sort and squashes the following commits:

1ef7780 [Reynold Xin] [SPARK-3408] Fixed Limit operator so it works with 
sort-based shuffle.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2614038
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2614038
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2614038

Branch: refs/heads/master
Commit: e2614038e78f4693fafedeee15b6fdf0ea1be473
Parents: 39db1bf
Author: Reynold Xin <[email protected]>
Authored: Sun Sep 7 18:42:24 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sun Sep 7 18:42:24 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/basicOperators.scala     | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2614038/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 4abda21..47bff0c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -20,10 +20,10 @@ package org.apache.spark.sql.execution
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.runtime.universe.TypeTag
 
+import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf}
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.{HashPartitioner, SparkConf}
 import org.apache.spark.rdd.{RDD, ShuffledRDD}
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
@@ -96,6 +96,9 @@ case class Limit(limit: Int, child: SparkPlan)
   // TODO: Implement a partition local limit, and use a strategy to generate 
the proper limit plan:
   // partition local limit -> exchange into one partition -> partition local 
limit again
 
+  /** We must copy rows when sort based shuffle is on */
+  private def sortBasedShuffleOn = 
SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
+
   override def output = child.output
 
   /**
@@ -143,9 +146,15 @@ case class Limit(limit: Int, child: SparkPlan)
   }
 
   override def execute() = {
-    val rdd = child.execute().mapPartitions { iter =>
-      val mutablePair = new MutablePair[Boolean, Row]()
-      iter.take(limit).map(row => mutablePair.update(false, row))
+    val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) {
+      child.execute().mapPartitions { iter =>
+        iter.take(limit).map(row => (false, row.copy()))
+      }
+    } else {
+      child.execute().mapPartitions { iter =>
+        val mutablePair = new MutablePair[Boolean, Row]()
+        iter.take(limit).map(row => mutablePair.update(false, row))
+      }
     }
     val part = new HashPartitioner(1)
     val shuffled = new ShuffledRDD[Boolean, Row, Row](rdd, part)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to