Repository: spark Updated Branches: refs/heads/branch-1.2 76046664d -> e81c86967
SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions takeOrdered should skip reduce step in case mapped RDDs have no partitions. This prevents the mentioned exception : 4. run query SELECT * FROM testTable WHERE market = 'market2' ORDER BY End_Time DESC LIMIT 100; Error trace java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:863) at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1136) Author: Yash Datta <[email protected]> Closes #3830 from saucam/fix_takeorder and squashes the following commits: 5974d10 [Yash Datta] SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions (cherry picked from commit 9bc0df6804f241aff24520d9c6ec54d9b11f5785) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e81c8696 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e81c8696 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e81c8696 Branch: refs/heads/branch-1.2 Commit: e81c869677b566dfcabedca89a40aeea7dc16fa9 Parents: 7604666 Author: Yash Datta <[email protected]> Authored: Mon Dec 29 13:49:45 2014 -0800 Committer: Reynold Xin <[email protected]> Committed: Mon Dec 29 13:50:34 2014 -0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e81c8696/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ff6d946..c26425d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1132,15 +1132,20 @@ abstract class RDD[T: ClassTag]( if (num == 0) { Array.empty } else { - mapPartitions { items => + val mapRDDs = mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) Iterator.single(queue) - }.reduce { (queue1, queue2) => - queue1 ++= queue2 - queue1 - }.toArray.sorted(ord) + } + if (mapRDDs.partitions.size == 0) { + Array.empty + } else { + mapRDDs.reduce { (queue1, queue2) => + queue1 ++= queue2 + queue1 + }.toArray.sorted(ord) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
