Repository: spark
Updated Branches:
  refs/heads/master b2e9731ca -> 958200497


[DOCS] Reorganize explanation of Accumulators and Broadcast Variables

## What changes were proposed in this pull request?

The discussion of the interaction of Accumulators and Broadcast Variables 
should logically follow the discussion on Checkpointing. As currently written, 
this section discusses Checkpointing before it is formally introduced. To 
remedy this:

 - Rename this section to "Accumulators, Broadcast Variables, and Checkpoints", 
and
 - Move this section after "Checkpointing".

## How was this patch tested?

Testing: ran

$ SKIP_API=1 jekyll build

, and verified changes in a Web browser pointed at docs/_site/index.html.

Author: José Hiram Soltren <[email protected]>

Closes #15281 from jsoltren/doc-changes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95820049
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95820049
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95820049

Branch: refs/heads/master
Commit: 958200497affb40f05e321c2b0e252d365ae02f4
Parents: b2e9731
Author: José Hiram Soltren <[email protected]>
Authored: Thu Sep 29 10:18:56 2016 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Thu Sep 29 10:18:56 2016 -0700

----------------------------------------------------------------------
 docs/streaming-programming-guide.md | 328 +++++++++++++++----------------
 1 file changed, 164 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/95820049/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index 43f1cf3..0b0315b 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1368,170 +1368,6 @@ Note that the connections in the pool should be lazily 
created on demand and tim
 
 ***
 
-## Accumulators and Broadcast Variables
-
-[Accumulators](programming-guide.html#accumulators) and [Broadcast 
variables](programming-guide.html#broadcast-variables) cannot be recovered from 
checkpoint in Spark Streaming. If you enable checkpointing and use 
[Accumulators](programming-guide.html#accumulators) or [Broadcast 
variables](programming-guide.html#broadcast-variables) as well, you'll have to 
create lazily instantiated singleton instances for 
[Accumulators](programming-guide.html#accumulators) and [Broadcast 
variables](programming-guide.html#broadcast-variables) so that they can be 
re-instantiated after the driver restarts on failure. This is shown in the 
following example.
-
-<div class="codetabs">
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-object WordBlacklist {
-
-  @volatile private var instance: Broadcast[Seq[String]] = null
-
-  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
-    if (instance == null) {
-      synchronized {
-        if (instance == null) {
-          val wordBlacklist = Seq("a", "b", "c")
-          instance = sc.broadcast(wordBlacklist)
-        }
-      }
-    }
-    instance
-  }
-}
-
-object DroppedWordsCounter {
-
-  @volatile private var instance: LongAccumulator = null
-
-  def getInstance(sc: SparkContext): LongAccumulator = {
-    if (instance == null) {
-      synchronized {
-        if (instance == null) {
-          instance = sc.longAccumulator("WordsInBlacklistCounter")
-        }
-      }
-    }
-    instance
-  }
-}
-
-wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
-  // Get or register the blacklist Broadcast
-  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
-  // Get or register the droppedWordsCounter Accumulator
-  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
-  // Use blacklist to drop words and use droppedWordsCounter to count them
-  val counts = rdd.filter { case (word, count) =>
-    if (blacklist.value.contains(word)) {
-      droppedWordsCounter.add(count)
-      false
-    } else {
-      true
-    }
-  }.collect().mkString("[", ", ", "]")
-  val output = "Counts at time " + time + " " + counts
-})
-
-{% endhighlight %}
-
-See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
-</div>
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-class JavaWordBlacklist {
-
-  private static volatile Broadcast<List<String>> instance = null;
-
-  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
-    if (instance == null) {
-      synchronized (JavaWordBlacklist.class) {
-        if (instance == null) {
-          List<String> wordBlacklist = Arrays.asList("a", "b", "c");
-          instance = jsc.broadcast(wordBlacklist);
-        }
-      }
-    }
-    return instance;
-  }
-}
-
-class JavaDroppedWordsCounter {
-
-  private static volatile LongAccumulator instance = null;
-
-  public static LongAccumulator getInstance(JavaSparkContext jsc) {
-    if (instance == null) {
-      synchronized (JavaDroppedWordsCounter.class) {
-        if (instance == null) {
-          instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
-        }
-      }
-    }
-    return instance;
-  }
-}
-
-wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, 
Void>() {
-  @Override
-  public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws 
IOException {
-    // Get or register the blacklist Broadcast
-    final Broadcast<List<String>> blacklist = 
JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
-    // Get or register the droppedWordsCounter Accumulator
-    final LongAccumulator droppedWordsCounter = 
JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
-    // Use blacklist to drop words and use droppedWordsCounter to count them
-    String counts = rdd.filter(new Function<Tuple2<String, Integer>, 
Boolean>() {
-      @Override
-      public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
-        if (blacklist.value().contains(wordCount._1())) {
-          droppedWordsCounter.add(wordCount._2());
-          return false;
-        } else {
-          return true;
-        }
-      }
-    }).collect().toString();
-    String output = "Counts at time " + time + " " + counts;
-  }
-}
-
-{% endhighlight %}
-
-See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-def getWordBlacklist(sparkContext):
-    if ("wordBlacklist" not in globals()):
-        globals()["wordBlacklist"] = sparkContext.broadcast(["a", "b", "c"])
-    return globals()["wordBlacklist"]
-
-def getDroppedWordsCounter(sparkContext):
-    if ("droppedWordsCounter" not in globals()):
-        globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
-    return globals()["droppedWordsCounter"]
-
-def echo(time, rdd):
-    # Get or register the blacklist Broadcast
-    blacklist = getWordBlacklist(rdd.context)
-    # Get or register the droppedWordsCounter Accumulator
-    droppedWordsCounter = getDroppedWordsCounter(rdd.context)
-
-    # Use blacklist to drop words and use droppedWordsCounter to count them
-    def filterFunc(wordCount):
-        if wordCount[0] in blacklist.value:
-            droppedWordsCounter.add(wordCount[1])
-            False
-        else:
-            True
-
-    counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
-
-wordCounts.foreachRDD(echo)
-
-{% endhighlight %}
-
-See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/recoverable_network_wordcount.py).
-
-</div>
-</div>
-
-***
-
 ## DataFrame and SQL Operations
 You can easily use [DataFrames and SQL](sql-programming-guide.html) operations 
on streaming data. You have to create a SparkSession using the SparkContext 
that the StreamingContext is using. Furthermore this has to done such that it 
can be restarted on driver failures. This is done by creating a lazily 
instantiated singleton instance of SparkSession. This is shown in the following 
example. It modifies the earlier [word count example](#a-quick-example) to 
generate word counts using DataFrames and SQL. Each RDD is converted to a 
DataFrame, registered as a temporary table and then queried using SQL.
 
@@ -1877,6 +1713,170 @@ batch interval that is at least 10 seconds. It can be 
set by using
 
 ***
 
+## Accumulators, Broadcast Variables, and Checkpoints
+
+[Accumulators](programming-guide.html#accumulators) and [Broadcast 
variables](programming-guide.html#broadcast-variables) cannot be recovered from 
checkpoint in Spark Streaming. If you enable checkpointing and use 
[Accumulators](programming-guide.html#accumulators) or [Broadcast 
variables](programming-guide.html#broadcast-variables) as well, you'll have to 
create lazily instantiated singleton instances for 
[Accumulators](programming-guide.html#accumulators) and [Broadcast 
variables](programming-guide.html#broadcast-variables) so that they can be 
re-instantiated after the driver restarts on failure. This is shown in the 
following example.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+object WordBlacklist {
+
+  @volatile private var instance: Broadcast[Seq[String]] = null
+
+  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
+    if (instance == null) {
+      synchronized {
+        if (instance == null) {
+          val wordBlacklist = Seq("a", "b", "c")
+          instance = sc.broadcast(wordBlacklist)
+        }
+      }
+    }
+    instance
+  }
+}
+
+object DroppedWordsCounter {
+
+  @volatile private var instance: LongAccumulator = null
+
+  def getInstance(sc: SparkContext): LongAccumulator = {
+    if (instance == null) {
+      synchronized {
+        if (instance == null) {
+          instance = sc.longAccumulator("WordsInBlacklistCounter")
+        }
+      }
+    }
+    instance
+  }
+}
+
+wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
+  // Get or register the blacklist Broadcast
+  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
+  // Get or register the droppedWordsCounter Accumulator
+  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
+  // Use blacklist to drop words and use droppedWordsCounter to count them
+  val counts = rdd.filter { case (word, count) =>
+    if (blacklist.value.contains(word)) {
+      droppedWordsCounter.add(count)
+      false
+    } else {
+      true
+    }
+  }.collect().mkString("[", ", ", "]")
+  val output = "Counts at time " + time + " " + counts
+})
+
+{% endhighlight %}
+
+See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+class JavaWordBlacklist {
+
+  private static volatile Broadcast<List<String>> instance = null;
+
+  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
+    if (instance == null) {
+      synchronized (JavaWordBlacklist.class) {
+        if (instance == null) {
+          List<String> wordBlacklist = Arrays.asList("a", "b", "c");
+          instance = jsc.broadcast(wordBlacklist);
+        }
+      }
+    }
+    return instance;
+  }
+}
+
+class JavaDroppedWordsCounter {
+
+  private static volatile LongAccumulator instance = null;
+
+  public static LongAccumulator getInstance(JavaSparkContext jsc) {
+    if (instance == null) {
+      synchronized (JavaDroppedWordsCounter.class) {
+        if (instance == null) {
+          instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
+        }
+      }
+    }
+    return instance;
+  }
+}
+
+wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, 
Void>() {
+  @Override
+  public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws 
IOException {
+    // Get or register the blacklist Broadcast
+    final Broadcast<List<String>> blacklist = 
JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+    // Get or register the droppedWordsCounter Accumulator
+    final LongAccumulator droppedWordsCounter = 
JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+    // Use blacklist to drop words and use droppedWordsCounter to count them
+    String counts = rdd.filter(new Function<Tuple2<String, Integer>, 
Boolean>() {
+      @Override
+      public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
+        if (blacklist.value().contains(wordCount._1())) {
+          droppedWordsCounter.add(wordCount._2());
+          return false;
+        } else {
+          return true;
+        }
+      }
+    }).collect().toString();
+    String output = "Counts at time " + time + " " + counts;
+  }
+}
+
+{% endhighlight %}
+
+See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+def getWordBlacklist(sparkContext):
+    if ("wordBlacklist" not in globals()):
+        globals()["wordBlacklist"] = sparkContext.broadcast(["a", "b", "c"])
+    return globals()["wordBlacklist"]
+
+def getDroppedWordsCounter(sparkContext):
+    if ("droppedWordsCounter" not in globals()):
+        globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
+    return globals()["droppedWordsCounter"]
+
+def echo(time, rdd):
+    # Get or register the blacklist Broadcast
+    blacklist = getWordBlacklist(rdd.context)
+    # Get or register the droppedWordsCounter Accumulator
+    droppedWordsCounter = getDroppedWordsCounter(rdd.context)
+
+    # Use blacklist to drop words and use droppedWordsCounter to count them
+    def filterFunc(wordCount):
+        if wordCount[0] in blacklist.value:
+            droppedWordsCounter.add(wordCount[1])
+            False
+        else:
+            True
+
+    counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
+
+wordCounts.foreachRDD(echo)
+
+{% endhighlight %}
+
+See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/recoverable_network_wordcount.py).
+
+</div>
+</div>
+
+***
+
 ## Deploying Applications
 This section discusses the steps to deploy a Spark Streaming application.
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to