Repository: spark Updated Branches: refs/heads/branch-1.0 bccd13ec1 -> 9754d1b12
SPARK-1509: add zipWithIndex zipWithUniqueId methods to java api Author: witgo <[email protected]> Closes #423 from witgo/zipWithIndex and squashes the following commits: 039ec04 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex 24d74c9 [witgo] review commit 763a5e4 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex 59747d1 [witgo] review commit 7bf4d06 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex daa8f84 [witgo] review commit 4070613 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex 18e6c97 [witgo] java api zipWithIndex test 11e2e7f [witgo] add zipWithIndex zipWithUniqueId methods to java api (cherry picked from commit 7d1505841069c6ecc3fa7e4896db535f18e4ce84) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9754d1b1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9754d1b1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9754d1b1 Branch: refs/heads/branch-1.0 Commit: 9754d1b1270aafcc6f864871e9b37c03e3cba1d1 Parents: bccd13e Author: witgo <[email protected]> Authored: Tue Apr 29 11:30:47 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Tue Apr 29 11:31:00 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/api/java/JavaRDDLike.scala | 22 +++++++++++++- .../java/org/apache/spark/JavaAPISuite.java | 31 +++++++++++++++----- 2 files changed, 45 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9754d1b1/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 574a986..af06d1d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -18,7 +18,7 @@ package org.apache.spark.api.java import java.util.{Comparator, List => JList, Iterator => JIterator} -import java.lang.{Iterable => JIterable} +import java.lang.{Iterable => JIterable, Long => JLong} import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -264,6 +264,26 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V]) } + /** + * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k, + * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method + * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]]. + */ + def zipWithUniqueId(): JavaPairRDD[T, JLong] = { + JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, JLong]] + } + + /** + * Zips this RDD with its element indices. The ordering is first based on the partition index + * and then the ordering of items within each partition. So the first item in the first + * partition gets index 0, and the last item in the last partition receives the largest index. + * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. + * This method needs to trigger a spark job when this RDD contains more than one partitions. + */ + def zipWithIndex(): JavaPairRDD[T, JLong] = { + JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, JLong]] + } + // Actions (launch a job to return a value to the user program) /** http://git-wip-us.apache.org/repos/asf/spark/blob/9754d1b1/core/src/test/java/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 76c6f5a..c3e03ce 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -182,13 +182,30 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(2, foreachCalls); } - @Test - public void toLocalIterator() { - List<Integer> correct = Arrays.asList(1, 2, 3, 4); - JavaRDD<Integer> rdd = sc.parallelize(correct); - List<Integer> result = Lists.newArrayList(rdd.toLocalIterator()); - Assert.assertTrue(correct.equals(result)); - } + @Test + public void toLocalIterator() { + List<Integer> correct = Arrays.asList(1, 2, 3, 4); + JavaRDD<Integer> rdd = sc.parallelize(correct); + List<Integer> result = Lists.newArrayList(rdd.toLocalIterator()); + Assert.assertTrue(correct.equals(result)); + } + + @Test + public void zipWithUniqueId() { + List<Integer> dataArray = Arrays.asList(1, 2, 3, 4); + JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithUniqueId(); + JavaRDD<Long> indexes = zip.values(); + Assert.assertTrue(new HashSet<Long>(indexes.collect()).size() == 4); + } + + @Test + public void zipWithIndex() { + List<Integer> dataArray = Arrays.asList(1, 2, 3, 4); + JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithIndex(); + JavaRDD<Long> indexes = zip.values(); + List<Long> correctIndexes = Arrays.asList(0L, 1L, 2L, 3L); + Assert.assertTrue(indexes.collect().equals(correctIndexes)); + } @SuppressWarnings("unchecked") @Test
