Repository: spark
Updated Branches:
  refs/heads/master cd106b050 -> 1e35e9693


[SPARK-17817] [PYSPARK] [FOLLOWUP] PySpark RDD Repartitioning Results in Highly 
Skewed Partition Sizes

## What changes were proposed in this pull request?

This change is a followup for #15389 which calls `_to_java_object_rdd()` to 
solve this issue. Due to the concern of the possible expensive cost of the 
call, we can choose to decrease the batch size to solve this issue too.

Simple benchmark:

    import time
    num_partitions = 20000
    a = sc.parallelize(range(int(1e6)), 2)
    start = time.time()
    l = a.repartition(num_partitions).glom().map(len).collect()
    end = time.time()
    print(end - start)

Before: 419.447577953
_to_java_object_rdd(): 421.916361094
decreasing the batch size: 423.712255955

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <vii...@gmail.com>

Closes #15445 from viirya/repartition-batch-size.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e35e969
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e35e969
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e35e969

Branch: refs/heads/master
Commit: 1e35e969305555dda02cb0788c8143e5f2e1944b
Parents: cd106b0
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Tue Oct 18 14:25:10 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Tue Oct 18 14:25:10 2016 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1e35e969/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 0e2ae19..2de2c2f 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2029,12 +2029,12 @@ class RDD(object):
         [[1, 2, 3, 4, 5]]
         """
         if shuffle:
-            # In Scala's repartition code, we will distribute elements evenly 
across output
-            # partitions. However, the RDD from Python is serialized as a 
single binary data,
-            # so the distribution fails and produces highly skewed partitions. 
We need to
-            # convert it to a RDD of java object before repartitioning.
-            data_java_rdd = self._to_java_object_rdd().coalesce(numPartitions, 
shuffle)
-            jrdd = self.ctx._jvm.SerDeUtil.javaToPython(data_java_rdd)
+            # Decrease the batch size in order to distribute evenly the 
elements across output
+            # partitions. Otherwise, repartition will possibly produce highly 
skewed partitions.
+            batchSize = min(10, self.ctx._batchSize or 1024)
+            ser = BatchedSerializer(PickleSerializer(), batchSize)
+            selfCopy = self._reserialize(ser)
+            jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle)
         else:
             jrdd = self._jrdd.coalesce(numPartitions, shuffle)
         return RDD(jrdd, self.ctx, self._jrdd_deserializer)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to