Repository: spark
Updated Branches:
  refs/heads/branch-1.1 d01fdd3bb -> e98aa5476


[SPARK-4459] Change groupBy type parameter from K to U

Please see https://issues.apache.org/jira/browse/SPARK-4459

Author: Saldanha <salda...@phusca-l24858.wlan.na.novartis.net>

Closes #3327 from alokito/master and squashes the following commits:

54b1095 [Saldanha] [SPARK-4459] changed type parameter for keyBy from K to U
d5f73c3 [Saldanha] [SPARK-4459] added keyBy test
316ad77 [Saldanha] SPARK-4459 changed type parameter for groupBy from K to U.
62ddd4b [Saldanha] SPARK-4459 added failing unit test
(cherry picked from commit 743a889d2778f797aabc3b1e8146e7aa32b62a48)

Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e98aa547
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e98aa547
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e98aa547

Branch: refs/heads/branch-1.1
Commit: e98aa5476151e9802045934890afd53b27762058
Parents: d01fdd3
Author: Saldanha <salda...@phusca-l24858.wlan.na.novartis.net>
Authored: Thu Dec 4 14:22:09 2014 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Thu Dec 4 14:59:02 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDDLike.scala | 17 ++++----
 .../java/org/apache/spark/JavaAPISuite.java     | 41 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e98aa547/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index d230678..ed61726 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -210,8 +210,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
    * Return an RDD of grouped elements. Each group consists of a key and a 
sequence of elements
    * mapping to that key.
    */
-  def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
-    implicit val ctagK: ClassTag[K] = fakeClassTag
+  def groupBy[U](f: JFunction[T, U]): JavaPairRDD[U, JIterable[T]] = {
+    // The type parameter is U instead of K in order to work around a compiler 
bug; see SPARK-4459
+    implicit val ctagK: ClassTag[U] = fakeClassTag
     implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
     JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
   }
@@ -220,10 +221,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] 
extends Serializable {
    * Return an RDD of grouped elements. Each group consists of a key and a 
sequence of elements
    * mapping to that key.
    */
-  def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, 
JIterable[T]] = {
-    implicit val ctagK: ClassTag[K] = fakeClassTag
+  def groupBy[U](f: JFunction[T, U], numPartitions: Int): JavaPairRDD[U, 
JIterable[T]] = {
+    // The type parameter is U instead of K in order to work around a compiler 
bug; see SPARK-4459
+    implicit val ctagK: ClassTag[U] = fakeClassTag
     implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
-    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, 
numPartitions)(fakeClassTag[K])))
+    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, 
numPartitions)(fakeClassTag[U])))
   }
 
   /**
@@ -458,8 +460,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   /**
    * Creates tuples of the elements in this RDD by applying `f`.
    */
-  def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
-    implicit val ctag: ClassTag[K] = fakeClassTag
+  def keyBy[U](f: JFunction[T, U]): JavaPairRDD[U, T] = {
+    // The type parameter is U instead of K in order to work around a compiler 
bug; see SPARK-4459
+    implicit val ctag: ClassTag[U] = fakeClassTag
     JavaPairRDD.fromRDD(rdd.keyBy(f))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e98aa547/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java 
b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index e1c13de..64b6377 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -298,6 +298,47 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); 
// Odds
   }
 
+  @Test
+  public void groupByOnPairRDD() {
+    // Regression test for SPARK-4459
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Function<Tuple2<Integer, Integer>, Boolean> areOdd =
+      new Function<Tuple2<Integer, Integer>, Boolean>() {
+        @Override
+        public Boolean call(Tuple2<Integer, Integer> x) {
+          return (x._1() % 2 == 0) && (x._2() % 2 == 0);
+        }
+      };
+    JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
+    JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = 
pairRDD.groupBy(areOdd);
+    Assert.assertEquals(2, oddsAndEvens.count());
+    Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  
// Evens
+    Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); 
// Odds
+
+    oddsAndEvens = pairRDD.groupBy(areOdd, 1);
+    Assert.assertEquals(2, oddsAndEvens.count());
+    Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  
// Evens
+    Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); 
// Odds
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void keyByOnPairRDD() {
+    // Regression test for SPARK-4459
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Function<Tuple2<Integer, Integer>, String> sumToString =
+      new Function<Tuple2<Integer, Integer>, String>() {
+        @Override
+        public String call(Tuple2<Integer, Integer> x) {
+          return String.valueOf(x._1() + x._2());
+        }
+      };
+    JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
+    JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = 
pairRDD.keyBy(sumToString);
+    Assert.assertEquals(7, keyed.count());
+    Assert.assertEquals(1, (long) keyed.lookup("2").get(0)._1());
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void cogroup() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to