Updated Branches:
  refs/heads/branch-0.8 ff53f02a1 -> 7084217ff

Merge pull request #38 from AndreSchumacher/pyspark_sorting

SPARK-705: implement sortByKey() in PySpark

This PR contains the implementation of a RangePartitioner in Python and uses 
its partition ID's to get a global sort in PySpark.

(cherry picked from commit b4fa11f6c96ee37ecd30231c1e22630055f52115)
Signed-off-by: Reynold Xin <r...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/dc2c90da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/dc2c90da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/dc2c90da

Branch: refs/heads/branch-0.8
Commit: dc2c90da46b5aab4ebe7cd553ded23cc06bbf6f0
Parents: ff53f02
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Wed Oct 9 11:59:47 2013 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Mon Oct 14 15:03:34 2013 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py | 48 +++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 47 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dc2c90da/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 58e1849..d1e4933 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -263,7 +263,53 @@ class RDD(object):
             raise TypeError
         return self.union(other)
 
-    # TODO: sort
+    def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda 
x: x):
+        """
+        Sorts this RDD, which is assumed to consist of (key, value) pairs.
+
+        >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
+        >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
+        [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
+        >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 
5)]
+        >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 
9)])
+        >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: 
k.lower()).collect()
+        [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), 
('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
+        """
+        if numPartitions is None:
+            numPartitions = self.ctx.defaultParallelism
+
+        bounds = list()
+
+        # first compute the boundary of each part via sampling: we want to 
partition
+        # the key-space into bins such that the bins have roughly the same
+        # number of (key, value) pairs falling into them
+        if numPartitions > 1:
+            rddSize = self.count()
+            maxSampleSize = numPartitions * 20.0 # constant from Spark's 
RangePartitioner
+            fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
+
+            samples = self.sample(False, fraction, 1).map(lambda (k, v): 
k).collect()        
+            samples = sorted(samples, reverse=(not ascending), key=keyfunc)
+
+            # we have numPartitions many parts but one of the them has
+            # an implicit boundary
+            for i in range(0, numPartitions - 1):
+                index = (len(samples) - 1) * (i + 1) / numPartitions
+                bounds.append(samples[index])
+
+        def rangePartitionFunc(k):
+            p = 0
+            while p < len(bounds) and keyfunc(k) > bounds[p]:
+                p += 1
+            if ascending:
+                return p
+            else:
+                return numPartitions-1-p
+
+        def mapFunc(iterator):
+            yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): 
keyfunc(k))
+
+        return self.partitionBy(numPartitions, 
partitionFunc=rangePartitionFunc).mapPartitions(mapFunc,preservesPartitioning=True).flatMap(lambda
 x: x, preservesPartitioning=True)
 
     def glom(self):
         """

Reply via email to