Repository: spark
Updated Branches:
  refs/heads/master e965fb55a -> 1ddfab8c4


[SPARK-19287][CORE][STREAMING] JavaPairRDD flatMapValues requires function 
returning Iterable, not Iterator

## What changes were proposed in this pull request?

Fix old oversight in API: Java `flatMapValues` needs a `FlatMapFunction`

## How was this patch tested?

Existing tests.

Closes #22690 from srowen/SPARK-19287.

Authored-by: Sean Owen <[email protected]>
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ddfab8c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ddfab8c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ddfab8c

Branch: refs/heads/master
Commit: 1ddfab8c4f37f8a87a0203610f52fafc0debdea0
Parents: e965fb5
Author: Sean Owen <[email protected]>
Authored: Fri Oct 12 18:10:59 2018 -0500
Committer: Sean Owen <[email protected]>
Committed: Fri Oct 12 18:10:59 2018 -0500

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/api/java/JavaPairRDD.scala  | 9 +++++----
 project/MimaExcludes.scala                                  | 4 +++-
 .../apache/spark/streaming/api/java/JavaPairDStream.scala   | 8 ++++----
 .../java/test/org/apache/spark/streaming/Java8APISuite.java | 2 +-
 .../java/test/org/apache/spark/streaming/JavaAPISuite.java  | 2 +-
 5 files changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1ddfab8c/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 9544475..80a4f84 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.api.java
 
 import java.{lang => jl}
 import java.lang.{Iterable => JIterable}
-import java.util.{Comparator, List => JList}
+import java.util.{Comparator, Iterator => JIterator, List => JList}
 
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
@@ -34,7 +34,8 @@ import org.apache.spark.{HashPartitioner, Partitioner}
 import org.apache.spark.Partitioner._
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
 import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => 
JFunction2, PairFunction}
+import org.apache.spark.api.java.function.{FlatMapFunction, Function => 
JFunction,
+  Function2 => JFunction2, PairFunction}
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
 import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
@@ -674,8 +675,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * Pass each value in the key-value pair RDD through a flatMap function 
without changing the
    * keys; this also retains the original RDD's partitioning.
    */
-  def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, 
U] = {
-    def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala
+  def flatMapValues[U](f: FlatMapFunction[V, U]): JavaPairRDD[K, U] = {
+    def fn: (V) => Iterator[U] = (x: V) => f.call(x).asScala
     implicit val ctag: ClassTag[U] = fakeClassTag
     fromRDD(rdd.flatMapValues(fn))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1ddfab8c/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index bf85fe0..851fa23 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,7 +36,9 @@ object MimaExcludes {
 
   // Exclude rules for 3.0.x
   lazy val v30excludes = v24excludes ++ Seq(
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.SnappyCompressionCodec.version")
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.SnappyCompressionCodec.version"),
+    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.api.java.JavaPairRDD.flatMapValues"),
+    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.api.java.JavaPairDStream.flatMapValues")
   )
 
   // Exclude rules for 2.4.x

http://git-wip-us.apache.org/repos/asf/spark/blob/1ddfab8c/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 2ec907c..c3c13df 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -34,7 +34,8 @@ import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaUtils, 
Optional}
 import org.apache.spark.api.java.JavaPairRDD._
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => 
JFunction2}
+import org.apache.spark.api.java.function.{FlatMapFunction, Function => 
JFunction,
+  Function2 => JFunction2}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
@@ -562,9 +563,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Return a new DStream by applying a flatmap function to the value of each 
key-value pairs in
    * 'this' DStream without changing the key.
    */
-  def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): 
JavaPairDStream[K, U] = {
-    import scala.collection.JavaConverters._
-    def fn: (V) => Iterable[U] = (x: V) => f.apply(x).asScala
+  def flatMapValues[U](f: FlatMapFunction[V, U]): JavaPairDStream[K, U] = {
+    def fn: (V) => Iterator[U] = (x: V) => f.call(x).asScala
     implicit val cm: ClassTag[U] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
     dstream.flatMapValues(fn)

http://git-wip-us.apache.org/repos/asf/spark/blob/1ddfab8c/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java 
b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
index 90d1f8c..b154f0e 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
@@ -841,7 +841,7 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
     JavaPairDStream<String, String> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
     JavaPairDStream<String, String> flatMapped =
-      pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2"));
+      pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + 
"2").iterator());
     JavaTestUtils.attachTestOutputStream(flatMapped);
     List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
     Assert.assertEquals(expected, result);

http://git-wip-us.apache.org/repos/asf/spark/blob/1ddfab8c/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java 
b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
index 6c86cac..c7cde56 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -1355,7 +1355,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         List<String> out = new ArrayList<>();
         out.add(in + "1");
         out.add(in + "2");
-        return out;
+        return out.iterator();
       });
 
     JavaTestUtils.attachTestOutputStream(flatMapped);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to