[ 
https://issues.apache.org/jira/browse/SPARK-732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15163921#comment-15163921
 ] 

Jim Lohse commented on SPARK-732:
---------------------------------

Affects versions only goes to 1.1.0, presumably this is still an issue? Is it 
correct that this is only an issue in transformations, but in actions will work 
correctly? That idea seems to be supported by the docs under 
https://spark.apache.org/docs/latest/programming-guide.html#accumulators-a-nameaccumlinka:
 

"In Java, Spark also supports the more general Accumulable interface to 
accumulate data where the resulting type is not the same as the elements added 
(e.g. build a list by collecting together elements).

For accumulator updates performed inside actions only, Spark guarantees that 
each task’s update to the accumulator will only be applied once, i.e. restarted 
tasks will not update the value. In transformations, users should be aware of 
that each task’s update may be applied more than once if tasks or job stages 
are re-executed."

> Recomputation of RDDs may result in duplicated accumulator updates
> ------------------------------------------------------------------
>
>                 Key: SPARK-732
>                 URL: https://issues.apache.org/jira/browse/SPARK-732
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 0.6.2, 0.7.0, 0.7.1, 0.7.2, 0.7.3, 0.8.0, 0.8.1, 0.8.2, 
> 0.9.0, 1.0.1, 1.1.0
>            Reporter: Josh Rosen
>            Assignee: Nan Zhu
>            Priority: Blocker
>
> Currently, Spark doesn't guard against duplicated updates to the same 
> accumulator due to recomputations of an RDD.  For example:
> {code}
>     val acc = sc.accumulator(0)
>     data.map(x => acc += 1; f(x))
>     data.count()
>     // acc should equal data.count() here
>     data.foreach{...}
>     // Now, acc = 2 * data.count() because the map() was recomputed.
> {code}
> I think that this behavior is incorrect, especially because this behavior 
> allows the additon or removal of a cache() call to affect the outcome of a 
> computation.
> There's an old TODO to fix this duplicate update issue in the [DAGScheduler 
> code|https://github.com/mesos/spark/blob/ec5e553b418be43aa3f0ccc24e0d5ca9d63504b2/core/src/main/scala/spark/scheduler/DAGScheduler.scala#L494].
> I haven't tested whether recomputation due to blocks being dropped from the 
> cache can trigger duplicate accumulator updates.
> Hypothetically someone could be relying on the current behavior to implement 
> performance counters that track the actual number of computations performed 
> (including recomputations).  To be safe, we could add an explicit warning in 
> the release notes that documents the change in behavior when we fix this.
> Ignoring duplicate updates shouldn't be too hard, but there are a few 
> subtleties.  Currently, we allow accumulators to be used in multiple 
> transformations, so we'd need to detect duplicate updates at the 
> per-transformation level.  I haven't dug too deeply into the scheduler 
> internals, but we might also run into problems where pipelining causes what 
> is logically one set of accumulator updates to show up in two different tasks 
> (e.g. rdd.map(accum += x; ...) and rdd.map(accum += x; ...).count() may cause 
> what's logically the same accumulator update to be applied from two different 
> contexts, complicating the detection of duplicate updates).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to