Repository: spark Updated Branches: refs/heads/master c55bbb49f -> f46e02fcd
SPARK-2203: PySpark defaults to use same num reduce partitions as map side For shuffle-based operators, such as rdd.groupBy() or rdd.sortByKey(), PySpark will always assume that the default parallelism to use for the reduce side is ctx.defaultParallelism, which is a constant typically determined by the number of cores in cluster. In contrast, Spark's Partitioner#defaultPartitioner will use the same number of reduce partitions as map partitions unless the defaultParallelism config is explicitly set. This tends to be a better default in order to avoid OOMs, and should also be the behavior of PySpark. JIRA: https://issues.apache.org/jira/browse/SPARK-2203 Author: Aaron Davidson <[email protected]> Closes #1138 from aarondav/pyfix and squashes the following commits: 1bd5751 [Aaron Davidson] SPARK-2203: PySpark defaults to use same num reduce partitions as map partitions Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f46e02fc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f46e02fc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f46e02fc Branch: refs/heads/master Commit: f46e02fcdbb3f86a8761c078708388d18282ee0c Parents: c55bbb4 Author: Aaron Davidson <[email protected]> Authored: Fri Jun 20 00:06:57 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Fri Jun 20 00:06:57 2014 -0700 ---------------------------------------------------------------------- python/pyspark/rdd.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f46e02fc/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a0b2c74..62a95c8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -512,7 +512,7 @@ class RDD(object): [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] """ if numPartitions is None: - numPartitions = self.ctx.defaultParallelism + numPartitions = self._defaultReducePartitions() bounds = list() @@ -1154,7 +1154,7 @@ class RDD(object): set([]) """ if numPartitions is None: - numPartitions = self.ctx.defaultParallelism + numPartitions = self._defaultReducePartitions() if partitionFunc is None: partitionFunc = lambda x: 0 if x is None else hash(x) @@ -1212,7 +1212,7 @@ class RDD(object): [('a', '11'), ('b', '1')] """ if numPartitions is None: - numPartitions = self.ctx.defaultParallelism + numPartitions = self._defaultReducePartitions() def combineLocally(iterator): combiners = {} for x in iterator: @@ -1475,6 +1475,21 @@ class RDD(object): java_storage_level.replication()) return storage_level + def _defaultReducePartitions(self): + """ + Returns the default number of partitions to use during reduce tasks (e.g., groupBy). + If spark.default.parallelism is set, then we'll use the value from SparkContext + defaultParallelism, otherwise we'll use the number of partitions in this RDD. + + This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce + the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will + be inherent. + """ + if self.ctx._conf.contains("spark.default.parallelism"): + return self.ctx.defaultParallelism + else: + return self.getNumPartitions() + # TODO: `lookup` is disabled because we can't make direct comparisons based # on the key; we need to compare the hash of the key to the hash of the # keys in the pairs. This could be an expensive operation, since those
