Repository: spark Updated Branches: refs/heads/master e16a8e7db -> 16a73c247
SPARK-2978. Transformation with MR shuffle semantics I didn't add this to the transformations list in the docs because it's kind of obscure, but would be happy to do so if others think it would be helpful. Author: Sandy Ryza <[email protected]> Closes #2274 from sryza/sandy-spark-2978 and squashes the following commits: 4a5332a [Sandy Ryza] Fix Java test c04b447 [Sandy Ryza] Fix Python doc and add back deleted code 433ad5b [Sandy Ryza] Add Java test 4c25a54 [Sandy Ryza] Add s at the end and a couple other fixes 9b0ba99 [Sandy Ryza] Fix compilation 36e0571 [Sandy Ryza] Fix import ordering 48c12c2 [Sandy Ryza] Add Java version and additional doc e5381cd [Sandy Ryza] Fix python style warnings f147634 [Sandy Ryza] SPARK-2978. Transformation with MR shuffle semantics Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16a73c24 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16a73c24 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16a73c24 Branch: refs/heads/master Commit: 16a73c2473181e03d88001aa3e08e6ffac92eb8b Parents: e16a8e7 Author: Sandy Ryza <[email protected]> Authored: Mon Sep 8 11:20:00 2014 -0700 Committer: Matei Zaharia <[email protected]> Committed: Mon Sep 8 11:20:00 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/api/java/JavaPairRDD.scala | 26 +++++++++++++++++ .../apache/spark/rdd/OrderedRDDFunctions.scala | 14 ++++++++- .../java/org/apache/spark/JavaAPISuite.java | 30 ++++++++++++++++++++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 14 +++++++++ python/pyspark/rdd.py | 24 ++++++++++++++++ python/pyspark/tests.py | 8 ++++++ 6 files changed, 115 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index feeb6c0..880f61c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -759,6 +759,32 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) } /** + * Repartition the RDD according to the given partitioner and, within each resulting partition, + * sort records by their keys. + * + * This is more efficient than calling `repartition` and then sorting within each partition + * because it can push the sorting down into the shuffle machinery. + */ + def repartitionAndSortWithinPartitions(partitioner: Partitioner): JavaPairRDD[K, V] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + repartitionAndSortWithinPartitions(partitioner, comp) + } + + /** + * Repartition the RDD according to the given partitioner and, within each resulting partition, + * sort records by their keys. + * + * This is more efficient than calling `repartition` and then sorting within each partition + * because it can push the sorting down into the shuffle machinery. + */ + def repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K]) + : JavaPairRDD[K, V] = { + implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering. + fromRDD( + new OrderedRDDFunctions[K, V, (K, V)](rdd).repartitionAndSortWithinPartitions(partitioner)) + } + + /** * Sort the RDD by key, so that each partition contains a sorted range of the elements in * ascending order. Calling `collect` or `save` on the resulting RDD will return or output an * ordered list of records (in the `save` case, they will be written to multiple `part-X` files http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala index e98bad2..d0dbfef 100644 --- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala @@ -19,7 +19,7 @@ package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.{Logging, RangePartitioner} +import org.apache.spark.{Logging, Partitioner, RangePartitioner} import org.apache.spark.annotation.DeveloperApi /** @@ -64,4 +64,16 @@ class OrderedRDDFunctions[K : Ordering : ClassTag, new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) } + + /** + * Repartition the RDD according to the given partitioner and, within each resulting partition, + * sort records by their keys. + * + * This is more efficient than calling `repartition` and then sorting within each partition + * because it can push the sorting down into the shuffle machinery. + */ + def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = { + new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering) + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/core/src/test/java/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index e1c13de..be99dc5 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -190,6 +190,36 @@ public class JavaAPISuite implements Serializable { } @Test + public void repartitionAndSortWithinPartitions() { + List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>(); + pairs.add(new Tuple2<Integer, Integer>(0, 5)); + pairs.add(new Tuple2<Integer, Integer>(3, 8)); + pairs.add(new Tuple2<Integer, Integer>(2, 6)); + pairs.add(new Tuple2<Integer, Integer>(0, 8)); + pairs.add(new Tuple2<Integer, Integer>(3, 8)); + pairs.add(new Tuple2<Integer, Integer>(1, 3)); + + JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); + + Partitioner partitioner = new Partitioner() { + public int numPartitions() { + return 2; + } + public int getPartition(Object key) { + return ((Integer)key).intValue() % 2; + } + }; + + JavaPairRDD<Integer, Integer> repartitioned = + rdd.repartitionAndSortWithinPartitions(partitioner); + List<List<Tuple2<Integer, Integer>>> partitions = repartitioned.glom().collect(); + Assert.assertEquals(partitions.get(0), Arrays.asList(new Tuple2<Integer, Integer>(0, 5), + new Tuple2<Integer, Integer>(0, 8), new Tuple2<Integer, Integer>(2, 6))); + Assert.assertEquals(partitions.get(1), Arrays.asList(new Tuple2<Integer, Integer>(1, 3), + new Tuple2<Integer, Integer>(3, 8), new Tuple2<Integer, Integer>(3, 8))); + } + + @Test public void emptyRDD() { JavaRDD<String> rdd = sc.emptyRDD(); Assert.assertEquals("Empty RDD shouldn't have any values", 0, rdd.count()); http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 499dcda..c1b501a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -682,6 +682,20 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(data.sortBy(parse, true, 2)(NameOrdering, classTag[Person]).collect() === nameOrdered) } + test("repartitionAndSortWithinPartitions") { + val data = sc.parallelize(Seq((0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)), 2) + + val partitioner = new Partitioner { + def numPartitions: Int = 2 + def getPartition(key: Any): Int = key.asInstanceOf[Int] % 2 + } + + val repartitioned = data.repartitionAndSortWithinPartitions(partitioner) + val partitions = repartitioned.glom().collect() + assert(partitions(0) === Seq((0, 5), (0, 8), (2, 6))) + assert(partitions(1) === Seq((1, 3), (3, 8), (3, 8))) + } + test("intersection") { val all = sc.parallelize(1 to 10) val evens = sc.parallelize(2 to 10 by 2) http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 266090e..5667154 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -520,6 +520,30 @@ class RDD(object): raise TypeError return self.union(other) + def repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=portable_hash, + ascending=True, keyfunc=lambda x: x): + """ + Repartition the RDD according to the given partitioner and, within each resulting partition, + sort records by their keys. + + >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)]) + >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 2) + >>> rdd2.glom().collect() + [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]] + """ + if numPartitions is None: + numPartitions = self._defaultReducePartitions() + + spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == "true") + memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m")) + serializer = self._jrdd_deserializer + + def sortPartition(iterator): + sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted + return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending))) + + return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True) + def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x): """ Sorts this RDD, which is assumed to consist of (key, value) pairs. http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/python/pyspark/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 9fbeb36..0bd2a9e 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -545,6 +545,14 @@ class TestRDDFunctions(PySparkTestCase): self.assertEquals(([1, "b"], [5]), rdd.histogram(1)) self.assertRaises(TypeError, lambda: rdd.histogram(2)) + def test_repartitionAndSortWithinPartitions(self): + rdd = self.sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)], 2) + + repartitioned = rdd.repartitionAndSortWithinPartitions(2, lambda key: key % 2) + partitions = repartitioned.glom().collect() + self.assertEquals(partitions[0], [(0, 5), (0, 8), (2, 6)]) + self.assertEquals(partitions[1], [(1, 3), (3, 8), (3, 8)]) + class TestSQL(PySparkTestCase): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
