Repository: spark Updated Branches: refs/heads/branch-1.1 65dae63fa -> d555c2ee6
[SPARK-3408] Fixed Limit operator so it works with sort-based shuffle. Author: Reynold Xin <[email protected]> Closes #2281 from rxin/sql-limit-sort and squashes the following commits: 1ef7780 [Reynold Xin] [SPARK-3408] Fixed Limit operator so it works with sort-based shuffle. (cherry picked from commit e2614038e78f4693fafedeee15b6fdf0ea1be473) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d555c2ee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d555c2ee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d555c2ee Branch: refs/heads/branch-1.1 Commit: d555c2ee6879499d666e5905397172d39c46c1a8 Parents: 65dae63 Author: Reynold Xin <[email protected]> Authored: Sun Sep 7 18:42:24 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Sun Sep 7 18:42:52 2014 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/basicOperators.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d555c2ee/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 4abda21..47bff0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.execution import scala.collection.mutable.ArrayBuffer import scala.reflect.runtime.universe.TypeTag +import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.rdd.{RDD, ShuffledRDD} -import org.apache.spark.sql.SQLContext +import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ @@ -96,6 +96,9 @@ case class Limit(limit: Int, child: SparkPlan) // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan: // partition local limit -> exchange into one partition -> partition local limit again + /** We must copy rows when sort based shuffle is on */ + private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] + override def output = child.output /** @@ -143,9 +146,15 @@ case class Limit(limit: Int, child: SparkPlan) } override def execute() = { - val rdd = child.execute().mapPartitions { iter => - val mutablePair = new MutablePair[Boolean, Row]() - iter.take(limit).map(row => mutablePair.update(false, row)) + val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) { + child.execute().mapPartitions { iter => + iter.take(limit).map(row => (false, row.copy())) + } + } else { + child.execute().mapPartitions { iter => + val mutablePair = new MutablePair[Boolean, Row]() + iter.take(limit).map(row => mutablePair.update(false, row)) + } } val part = new HashPartitioner(1) val shuffled = new ShuffledRDD[Boolean, Row, Row](rdd, part) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
