[SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator
Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable. CC rxin pwendell for API change; tdas since it also touches streaming. Author: Sean Owen <[email protected]> Closes #10413 from srowen/SPARK-3369. Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/881d1af6 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/881d1af6 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/881d1af6 Branch: refs/heads/master Commit: 881d1af6c9b6af77780cf2880f20579073ba10a4 Parents: bbb1ddc Author: Sean Owen <[email protected]> Authored: Tue Jan 26 11:55:28 2016 +0000 Committer: Sean Owen <[email protected]> Committed: Tue Jan 26 11:55:28 2016 +0000 ---------------------------------------------------------------------- .../spark/examples/streaming/akka/JavaActorWordCount.java | 5 +++-- .../streaming/twitter/JavaTwitterHashTagJoinSentiments.java | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/881d1af6/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java ---------------------------------------------------------------------- diff --git a/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java b/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java index 62e5633..cf77466 100644 --- a/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java +++ b/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java @@ -18,6 +18,7 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; +import java.util.Iterator; import scala.Tuple2; @@ -120,8 +121,8 @@ public class JavaActorWordCount { // compute wordcount lines.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String s) { - return Arrays.asList(s.split("\\s+")); + public Iterator<String> call(String s) { + return Arrays.asList(s.split("\\s+")).iterator(); } }).mapToPair(new PairFunction<String, String, Integer>() { @Override http://git-wip-us.apache.org/repos/asf/bahir/blob/881d1af6/streaming-twitter/examples/src/main/java/org/apache/spark/examples/streaming/twitter/JavaTwitterHashTagJoinSentiments.java ---------------------------------------------------------------------- diff --git a/streaming-twitter/examples/src/main/java/org/apache/spark/examples/streaming/twitter/JavaTwitterHashTagJoinSentiments.java b/streaming-twitter/examples/src/main/java/org/apache/spark/examples/streaming/twitter/JavaTwitterHashTagJoinSentiments.java index d869768..f0ae9a9 100644 --- a/streaming-twitter/examples/src/main/java/org/apache/spark/examples/streaming/twitter/JavaTwitterHashTagJoinSentiments.java +++ b/streaming-twitter/examples/src/main/java/org/apache/spark/examples/streaming/twitter/JavaTwitterHashTagJoinSentiments.java @@ -34,6 +34,7 @@ import scala.Tuple2; import twitter4j.Status; import java.util.Arrays; +import java.util.Iterator; import java.util.List; /** @@ -70,8 +71,8 @@ public class JavaTwitterHashTagJoinSentiments { JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() { @Override - public Iterable<String> call(Status s) { - return Arrays.asList(s.getText().split(" ")); + public Iterator<String> call(Status s) { + return Arrays.asList(s.getText().split(" ")).iterator(); } });
