Repository: spark
Updated Branches:
  refs/heads/master 02b55de3d -> 9bc0df680


SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions

takeOrdered should skip reduce step in case mapped RDDs have no partitions. 
This prevents the mentioned exception :

4. run query
SELECT * FROM testTable WHERE market = 'market2' ORDER BY End_Time DESC LIMIT 
100;
Error trace
java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:863)
at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1136)

Author: Yash Datta <[email protected]>

Closes #3830 from saucam/fix_takeorder and squashes the following commits:

5974d10 [Yash Datta] SPARK-4968: takeOrdered to skip reduce step in case 
mappers return no partitions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bc0df68
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bc0df68
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bc0df68

Branch: refs/heads/master
Commit: 9bc0df6804f241aff24520d9c6ec54d9b11f5785
Parents: 02b55de
Author: Yash Datta <[email protected]>
Authored: Mon Dec 29 13:49:45 2014 -0800
Committer: Reynold Xin <[email protected]>
Committed: Mon Dec 29 13:49:45 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9bc0df68/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f47c2d1..5118e2b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1146,15 +1146,20 @@ abstract class RDD[T: ClassTag](
     if (num == 0) {
       Array.empty
     } else {
-      mapPartitions { items =>
+      val mapRDDs = mapPartitions { items =>
         // Priority keeps the largest elements, so let's reverse the ordering.
         val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
         queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
         Iterator.single(queue)
-      }.reduce { (queue1, queue2) =>
-        queue1 ++= queue2
-        queue1
-      }.toArray.sorted(ord)
+      }
+      if (mapRDDs.partitions.size == 0) {
+        Array.empty
+      } else {
+        mapRDDs.reduce { (queue1, queue2) =>
+          queue1 ++= queue2
+          queue1
+        }.toArray.sorted(ord)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to