Repository: spark
Updated Branches:
  refs/heads/master 3d0a02dff -> 11d549417

SPARK-1663. Corrections for several compile errors in streaming code examples, 
and updates to follow API changes

I gave the Streaming code examples, both Scala and Java, a test run today. I 
turned up a number of small errors, mostly compile errors in the Java examples. 
There were a few typos in the Scala too.

I also took the liberty of adding things like imports, since in several cases 
they are not obvious. Feel free to push back on some changes.

There's one thing I haven't quite addressed in the changes. `JavaPairDStream` 
uses the Java API version of `Function2` in almost all cases, as `JFunction2`. 
However it uses `scala.Function2` in:

  def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: 
  :JavaPairDStream[K, V] = {
    dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)

Is that a typo?

Also, in Scala, I could not get this to compile:
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), 
error: missing parameter type for expanded function ((x$1, x$2) => 

You can see my fix below but am I missing something?

Otherwise I can say these all worked for me!

Author: Sean Owen <>

Closes #589 from srowen/SPARK-1663 and squashes the following commits:

65a906b [Sean Owen] Corrections for several compile errors in streaming code 
examples, and updates to follow API changes


Branch: refs/heads/master
Commit: 11d54941760f86706e28f7ace8ece664c9164ba6
Parents: 3d0a02d
Author: Sean Owen <>
Authored: Sat May 3 12:31:31 2014 -0700
Committer: Patrick Wendell <>
Committed: Sat May 3 12:31:31 2014 -0700

 docs/ | 62 ++++++++++++++++++--------------
 1 file changed, 36 insertions(+), 26 deletions(-)
diff --git a/docs/ 
index 7ad0642..b22bb45 100644
--- a/docs/
+++ b/docs/
@@ -76,16 +76,19 @@ Besides Spark's configuration, we specify that any DStream 
will be processed
 in 1 second batches.
 {% highlight scala %}
-// Create a StreamingContext with a SparkConf configuration
-val ssc = new StreamingContext(sparkConf, Seconds(1))
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.api._
+// Create a StreamingContext with a local master
+val ssc = new StreamingContext("local", "NetworkWordCount", Seconds(1))
 {% endhighlight %}
 Using this context, we then create a new DStream
 by specifying the IP address and port of the data server.
 {% highlight scala %}
-// Create a DStream that will connect to serverIP:serverPort
-val lines = ssc.socketTextStream(serverIP, serverPort)
+// Create a DStream that will connect to serverIP:serverPort, like 
+val lines = ssc.socketTextStream("localhost", 9999)
 {% endhighlight %}
 This `lines` DStream represents the stream of data that will be received from 
the data
@@ -103,6 +106,7 @@ each line will be split into multiple words and the stream 
of words is represent
 `words` DStream.  Next, we want to count these words.
 {% highlight scala %}
+import org.apache.spark.streaming.StreamingContext._
 // Count each word in each batch
 val pairs = => (word, 1))
 val wordCounts = pairs.reduceByKey(_ + _)
@@ -138,16 +142,20 @@ functionality. Besides Spark's configuration, we specify 
that any DStream would
 in 1 second batches.
 {% highlight java %}
-// Create a StreamingContext with a SparkConf configuration
-JavaStreamingContext jssc = StreamingContext(sparkConf, new Duration(1000))
+import org.apache.spark.streaming.*;
+import scala.Tuple2;
+// Create a StreamingContext with a local master
+JavaStreamingContext jssc = new JavaStreamingContext("local", 
"JavaNetworkWordCount", new Duration(1000))
 {% endhighlight %}
 Using this context, we then create a new DStream
 by specifying the IP address and port of the data server.
 {% highlight java %}
-// Create a DStream that will connect to serverIP:serverPort
-JavaDStream<String> lines = jssc.socketTextStream(serverIP, serverPort);
+// Create a DStream that will connect to serverIP:serverPort, like 
+JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
 {% endhighlight %}
 This `lines` DStream represents the stream of data that will be received from 
the data
@@ -159,7 +167,7 @@ space into words.
 JavaDStream<String> words = lines.flatMap(
   new FlatMapFunction<String, String>() {
     @Override public Iterable<String> call(String x) {
-      return Lists.newArrayList(x.split(" "));
+      return Arrays.asList(x.split(" "));
 {% endhighlight %}
@@ -359,7 +367,7 @@ as explained earlier. Finally, the last two parameters are 
needed to deploy your
  if running in distributed mode, as described in the
  [Spark programming 
  Additionally, the underlying SparkContext can be accessed as
 The batch interval must be set based on the latency requirements of your 
 and available cluster resources. See the [Performance 
@@ -399,7 +407,7 @@ These operations are discussed in detail in later sections.
 ## Input Sources
-We have already taken a look at the `streamingContext.socketTextStream(...)` 
in the [quick
+We have already taken a look at the `ssc.socketTextStream(...)` in the [quick
 example](#a-quick-example) which creates a DStream from text
 data received over a TCP socket connection. Besides sockets, the core Spark 
Streaming API provides
 methods for creating DStreams from files and Akka actors as input sources.
@@ -409,12 +417,12 @@ Specifically, for files, the DStream can be created as
 <div class="codetabs">
 <div data-lang="scala">
 {% highlight scala %}
 {% endhighlight %}
 <div data-lang="java">
 {% highlight java %}
 {% endhighlight %}
@@ -443,13 +451,13 @@ project dependencies, you can create a DStream from Kafka 
 <div data-lang="scala">
 {% highlight scala %}
 import org.apache.spark.streaming.kafka._
-KafkaUtils.createStream(streamingContext, kafkaParams, ...)
+KafkaUtils.createStream(ssc, kafkaParams, ...)
 {% endhighlight %}
 <div data-lang="java">
 {% highlight java %}
-import org.apache.spark.streaming.kafka.*
-KafkaUtils.createStream(javaStreamingContext, kafkaParams, ...);
+import org.apache.spark.streaming.kafka.*;
+KafkaUtils.createStream(jssc, kafkaParams, ...);
 {% endhighlight %}
@@ -578,13 +586,14 @@ val runningCounts = 
pairs.updateStateByKey[Int](updateFunction _)
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
   new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
     @Override public Optional<Integer> call(List<Integer> values, 
Optional<Integer> state) {
       Integer newSum = ...  // add the new values with the previous running 
count to get the new count
-      return Optional.of(newSum)
+      return Optional.of(newSum);
-  }
+  };
 {% endhighlight %}
 This is applied on a DStream containing words (say, the `pairs` DStream 
containing `(word,
@@ -617,9 +626,9 @@ spam information (maybe generated with Spark as well) and 
then filtering based o
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam 
+val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam 
-val cleanedDStream = inputDStream.transform(rdd => {
+val cleanedDStream = wordCounts.transform(rdd => {
   rdd.join(spamInfoRDD).filter(...) // join data stream with spam information 
to do data cleaning
@@ -629,13 +638,14 @@ val cleanedDStream = inputDStream.transform(rdd => {
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // RDD containing spam information
-JavaPairRDD<String, Double> spamInfoRDD = javaSparkContext.hadoopFile(...);
+final JavaPairRDD<String, Double> spamInfoRDD = 
-JavaPairDStream<String, Integer> cleanedDStream = inputDStream.transform(
+JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
   new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
     @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, 
Integer> rdd) throws Exception {
-      rdd.join(spamInfoRDD).filter(...) // join data stream with spam 
information to do data cleaning
+      rdd.join(spamInfoRDD).filter(...); // join data stream with spam 
information to do data cleaning
@@ -684,7 +694,7 @@ operation `reduceByKeyAndWindow`.
 {% highlight scala %}
 // Reduce last 30 seconds of data, every 10 seconds
-val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), 
+val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), 
Seconds(30), Seconds(10))
 {% endhighlight %}
@@ -699,7 +709,7 @@ Function2<Integer, Integer, Integer> reduceFunc = new 
Function2<Integer, Integer
 // Reduce last 30 seconds of data, every 10 seconds
-JavaPairDStream<String, Integer> windowedWordCounts = 
pair.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
+JavaPairDStream<String, Integer> windowedWordCounts = 
pairs.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new 
 {% endhighlight %}
@@ -1087,7 +1097,7 @@ This behavior is made simple by using 
`JavaStreamingContext.getOrCreate`. This i
 {% highlight java %}
 // Create a factory object that can create a and setup a new 
 JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() 
-  JavaStreamingContextFactory create() {
+  @Override public JavaStreamingContext create() {
     JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
     JavaDStream<String> lines = jssc.socketTextStream(...);     // create 

Reply via email to