Repository: spark
Updated Branches:
  refs/heads/master 5b335bdda -> 548643a9e


[SPARK-4423] Improve foreach() documentation to avoid confusion between local- 
and cluster-mode behavior

Hi all - I've added a writeup on how closures work within Spark to help clarify 
the general case for this problem and similar problems. I hope this addresses 
the issue and would love any feedback.

Author: Ilya Ganelin <[email protected]>

Closes #4696 from ilganeli/SPARK-4423 and squashes the following commits:

c5dc498 [Ilya Ganelin] Fixed typo
07b78e8 [Ilya Ganelin] Updated to fix capitalization
48c1983 [Ilya Ganelin] Updated to fix capitalization and clarify wording
2fd2a07 [Ilya Ganelin] Incoporated a few more minor fixes. Fixed a bug in 
python code. Added semicolons for java
4772f99 [Ilya Ganelin] Incorporated latest feedback
448bd79 [Ilya Ganelin] Updated some verbage and added section links
5dbbda5 [Ilya Ganelin] Improved some wording
d374d3a [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into 
SPARK-4423
2600668 [Ilya Ganelin] Minor edits
c768ab2 [Ilya Ganelin] Updated documentation to add a section on closures. This 
helps understand confusing behavior of foreach and map functions when 
attempting to modify variables outside of the scope of an RDD action or 
transformation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/548643a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/548643a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/548643a9

Branch: refs/heads/master
Commit: 548643a9e4690b69e2a496cdcd0a426b6de8d8b5
Parents: 5b335bd
Author: Ilya Ganelin <[email protected]>
Authored: Wed Mar 11 13:20:15 2015 +0000
Committer: Sean Owen <[email protected]>
Committed: Wed Mar 11 13:20:15 2015 +0000

----------------------------------------------------------------------
 docs/programming-guide.md | 72 +++++++++++++++++++++++++++++++++++++++---
 1 file changed, 68 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/548643a9/docs/programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index fa0b4e3..c011a84 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -725,7 +725,7 @@ class MyClass(object):
     def __init__(self):
         self.field = "Hello"
     def doStuff(self, rdd):
-        return rdd.map(lambda s: self.field + x)
+        return rdd.map(lambda s: self.field + s)
 {% endhighlight %}
 
 To avoid this issue, the simplest way is to copy `field` into a local variable 
instead
@@ -734,13 +734,76 @@ of accessing it externally:
 {% highlight python %}
 def doStuff(self, rdd):
     field = self.field
-    return rdd.map(lambda s: field + x)
+    return rdd.map(lambda s: field + s)
 {% endhighlight %}
 
 </div>
 
 </div>
 
+### Understanding closures <a name="ClosuresLink"></a>
+One of the harder things about Spark is understanding the scope and life cycle 
of variables and methods when executing code across a cluster. RDD operations 
that modify variables outside of their scope can be a frequent source of 
confusion. In the example below we'll look at code that uses `foreach()` to 
increment a counter, but similar issues can occur for other operations as well.
+
+#### Example
+
+Consider the naive RDD element sum below, which behaves completely differently 
depending on whether execution is happening within the same JVM. A common 
example of this is when running Spark in `local` mode (`--master = local[n]`) 
versus deploying a Spark application to a cluster (e.g. via spark-submit to 
YARN): 
+
+<div class="codetabs">
+
+<div data-lang="scala"  markdown="1">
+{% highlight scala %}
+var counter = 0
+var rdd = sc.parallelize(data)
+
+// Wrong: Don't do this!!
+rdd.foreach(x => counter += x)
+
+println("Counter value: " + counter)
+{% endhighlight %}
+</div>
+
+<div data-lang="java"  markdown="1">
+{% highlight java %}
+int counter = 0;
+JavaRDD<Integer> rdd = sc.parallelize(data); 
+
+// Wrong: Don't do this!!
+rdd.foreach(x -> counter += x);
+
+println("Counter value: " + counter);
+{% endhighlight %}
+</div>
+
+<div data-lang="python"  markdown="1">
+{% highlight python %}
+counter = 0
+rdd = sc.parallelize(data)
+
+# Wrong: Don't do this!!
+rdd.foreach(lambda x: counter += x)
+
+print("Counter value: " + counter)
+
+{% endhighlight %}
+</div>
+
+</div>
+
+#### Local vs. cluster modes
+
+The primary challenge is that the behavior of the above code is undefined. In 
local mode with a single JVM, the above code will sum the values within the RDD 
and store it in **counter**. This is because both the RDD and the variable 
**counter** are in the same memory space on the driver node. 
+
+However, in `cluster` mode, what happens is more complicated, and the above 
may not work as intended. To execute jobs, Spark breaks up the processing of 
RDD operations into tasks - each of which is operated on by an executor. Prior 
to execution, Spark computes the **closure**. The closure is those variables 
and methods which must be visible for the executor to perform its computations 
on the RDD (in this case `foreach()`). This closure is serialized and sent to 
each executor. In `local` mode, there is only the one executors so everything 
shares the same closure. In other modes however, this is not the case and the 
executors running on seperate worker nodes each have their own copy of the 
closure.
+
+What is happening here is that the variables within the closure sent to each 
executor are now copies and thus, when **counter** is referenced within the 
`foreach` function, it's no longer the **counter** on the driver node. There is 
still a **counter** in the memory of the driver node but this is no longer 
visible to the executors! The executors only sees the copy from the serialized 
closure. Thus, the final value of **counter** will still be zero since all 
operations on **counter** were referencing the value within the serialized 
closure.  
+
+To ensure well-defined behavior in these sorts of scenarios one should use an 
[`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to 
provide a mechanism for safely updating a variable when execution is split up 
across worker nodes in a cluster. The Accumulators section of this guide 
discusses these in more detail.  
+
+In general, closures - constructs like loops or locally defined methods, 
should not be used to mutate some global state. Spark does not define or 
guarantee the behavior of mutations to objects referenced from outside of 
closures. Some code that does this may work in local mode, but that's just by 
accident and such code will not behave as expected in distributed mode. Use an 
Accumulator instead if some global aggregation is needed.
+
+#### Printing elements of an RDD 
+Another common idiom is attempting to print out the elements of an RDD using 
`rdd.foreach(println)` or `rdd.map(println)`. On a single machine, this will 
generate the expected output and print all the RDD's elements. However, in 
`cluster` mode, the output to `stdout` being called by the executors is now 
writing to the executor's `stdout` instead, not the one on the driver, so 
`stdout` on the driver won't show these! To print all elements on the driver, 
one can use the `collect()` method to first bring the RDD to the driver node 
thus: `rdd.collect().foreach(println)`. This can cause the driver to run out of 
memory, though, because `collect()` fetches the entire RDD to a single machine; 
if you only need to print a few elements of the RDD, a safer approach is to use 
the `take()`: `rdd.take(100).foreach(println)`.
+ 
 ### Working with Key-Value Pairs
 
 <div class="codetabs">
@@ -1018,7 +1081,8 @@ for details.
 </tr>
 <tr>
   <td> <b>foreach</b>(<i>func</i>) </td>
-  <td> Run a function <i>func</i> on each element of the dataset. This is 
usually done for side effects such as updating an accumulator variable (see 
below) or interacting with external storage systems. </td>
+  <td> Run a function <i>func</i> on each element of the dataset. This is 
usually done for side effects such as updating an <a 
href="#AccumLink">Accumulator</a> or interacting with external storage systems. 
+  <br /><b>Note</b>: modifying variables other than Accumulators outside of 
the <code>foreach()</code> may result in undefined behavior. See <a 
href="#ClosuresLink">Understanding closures </a> for more details.</td>
 </tr>
 </table>
 
@@ -1191,7 +1255,7 @@ run on the cluster so that `v` is not shipped to the 
nodes more than once. In ad
 `v` should not be modified after it is broadcast in order to ensure that all 
nodes get the same
 value of the broadcast variable (e.g. if the variable is shipped to a new node 
later).
 
-## Accumulators
+## Accumulators <a name="AccumLink"></a>
 
 Accumulators are variables that are only "added" to through an associative 
operation and can
 therefore be efficiently supported in parallel. They can be used to implement 
counters (as in


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to