Repository: spark
Updated Branches:
  refs/heads/master 5eea33230 -> a0eec8e8f


[SPARK-15208][WIP][CORE][STREAMING][DOCS] Update Spark examples with 
AccumulatorV2

## What changes were proposed in this pull request?

The patch updates the codes & docs in the example module as well as the related 
doc module:

- [ ] [docs] `streaming-programming-guide.md`
  - [x] scala code part
  - [ ] java code part
  - [ ] python code part
- [x] [examples] `RecoverableNetworkWordCount.scala`
- [ ] [examples] `JavaRecoverableNetworkWordCount.java`
- [ ] [examples] `recoverable_network_wordcount.py`

## How was this patch tested?

Ran the examples and verified results manually.

Author: Liwei Lin <lwl...@gmail.com>

Closes #12981 from lw-lin/accumulatorV2-examples.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0eec8e8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0eec8e8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0eec8e8

Branch: refs/heads/master
Commit: a0eec8e8ffd5a43cae67aa0f5dbcf7ca19a4f3aa
Parents: 5eea332
Author: Liwei Lin <lwl...@gmail.com>
Authored: Thu Jun 2 11:07:15 2016 -0500
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Jun 2 11:07:15 2016 -0500

----------------------------------------------------------------------
 docs/streaming-programming-guide.md                     | 12 ++++++------
 .../streaming/RecoverableNetworkWordCount.scala         |  3 +--
 2 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a0eec8e8/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index 6550fcc..78ae6a7 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1395,13 +1395,13 @@ object WordBlacklist {
 
 object DroppedWordsCounter {
 
-  @volatile private var instance: Accumulator[Long] = null
+  @volatile private var instance: LongAccumulator = null
 
-  def getInstance(sc: SparkContext): Accumulator[Long] = {
+  def getInstance(sc: SparkContext): LongAccumulator = {
     if (instance == null) {
       synchronized {
         if (instance == null) {
-          instance = sc.accumulator(0L, "WordsInBlacklistCounter")
+          instance = sc.longAccumulator("WordsInBlacklistCounter")
         }
       }
     }
@@ -1409,7 +1409,7 @@ object DroppedWordsCounter {
   }
 }
 
-wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
+wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
   // Get or register the blacklist Broadcast
   val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
   // Get or register the droppedWordsCounter Accumulator
@@ -1417,12 +1417,12 @@ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: 
Time) => {
   // Use blacklist to drop words and use droppedWordsCounter to count them
   val counts = rdd.filter { case (word, count) =>
     if (blacklist.value.contains(word)) {
-      droppedWordsCounter += count
+      droppedWordsCounter.add(count)
       false
     } else {
       true
     }
-  }.collect()
+  }.collect().mkString("[", ", ", "]")
   val output = "Counts at time " + time + " " + counts
 })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a0eec8e8/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index acbcb0c..49c0427 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -27,8 +27,7 @@ import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
-import org.apache.spark.util.IntParam
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{IntParam, LongAccumulator}
 
 /**
  * Use this singleton to get or register a Broadcast variable.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to