Repository: spark
Updated Branches:
  refs/heads/master 6bee01dd0 -> 7d9cc9214


SPARK-1770: Load balance elements when repartitioning.

This patch adds better balancing when performing a repartition of an
RDD. Previously the elements in the RDD were hash partitioned, meaning
if the RDD was skewed certain partitions would end up being very large.

This commit adds load balancing of elements across the repartitioned
RDD splits. The load balancing is not perfect: a given output partition
can have up to N more elements than the average if there are N input
partitions. However, some randomization is used to minimize the
probabiliy that this happens.

Author: Patrick Wendell <[email protected]>

Closes #727 from pwendell/load-balance and squashes the following commits:

f9da752 [Patrick Wendell] Response to Matei's feedback
acfa46a [Patrick Wendell] SPARK-1770: Load balance elements when repartitioning.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d9cc921
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d9cc921
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d9cc921

Branch: refs/heads/master
Commit: 7d9cc9214bd06495f6838e355331dd2b5f1f7407
Parents: 6bee01d
Author: Patrick Wendell <[email protected]>
Authored: Sun May 11 17:11:55 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Sun May 11 17:11:55 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 15 +++++++--
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 33 ++++++++++++++++++++
 2 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7d9cc921/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a1ca612..aa03e92 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -328,11 +328,22 @@ abstract class RDD[T: ClassTag](
   def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: 
Ordering[T] = null)
       : RDD[T] = {
     if (shuffle) {
+      /** Distributes elements evenly across output partitions, starting from 
a random partition. */
+      def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, 
T)] = {
+        var position = (new Random(index)).nextInt(numPartitions)
+        items.map { t =>
+          // Note that the hash code of the key will just be the key itself. 
The HashPartitioner 
+          // will mod it with the number of total partitions.
+          position = position + 1
+          (position, t)
+        }
+      }
+
       // include a shuffle step so that our upstream tasks are still 
distributed
       new CoalescedRDD(
-        new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
+        new ShuffledRDD[Int, T, (Int, 
T)](mapPartitionsWithIndex(distributePartition),
         new HashPartitioner(numPartitions)),
-        numPartitions).keys
+        numPartitions).values
     } else {
       new CoalescedRDD(this, numPartitions)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d9cc921/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 8da9a0d..e686068 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -202,6 +202,39 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(repartitioned2.collect().toSet === (1 to 1000).toSet)
   }
 
+  test("repartitioned RDDs perform load balancing") {
+    // Coalesce partitions
+    val input = Array.fill(1000)(1)
+    val initialPartitions = 10
+    val data = sc.parallelize(input, initialPartitions)
+
+    val repartitioned1 = data.repartition(2)
+    assert(repartitioned1.partitions.size == 2)
+    val partitions1 = repartitioned1.glom().collect()
+    // some noise in balancing is allowed due to randomization
+    assert(math.abs(partitions1(0).length - 500) < initialPartitions)
+    assert(math.abs(partitions1(1).length - 500) < initialPartitions)
+    assert(repartitioned1.collect() === input)
+
+    def testSplitPartitions(input: Seq[Int], initialPartitions: Int, 
finalPartitions: Int) {
+      val data = sc.parallelize(input, initialPartitions)
+      val repartitioned = data.repartition(finalPartitions)
+      assert(repartitioned.partitions.size === finalPartitions)
+      val partitions = repartitioned.glom().collect()
+      // assert all elements are present
+      assert(repartitioned.collect().sortWith(_ > _).toSeq === 
input.toSeq.sortWith(_ > _).toSeq)
+      // assert no bucket is overloaded
+      for (partition <- partitions) {
+        val avg = input.size / finalPartitions
+        val maxPossible = avg + initialPartitions
+        assert(partition.length <=  maxPossible)
+      }
+    }
+
+    testSplitPartitions(Array.fill(100)(1), 10, 20)
+    testSplitPartitions(Array.fill(10000)(1) ++ Array.fill(10000)(2), 20, 100)
+  }
+
   test("coalesced RDDs") {
     val data = sc.parallelize(1 to 10, 10)
 

Reply via email to