[ 
https://issues.apache.org/jira/browse/SPARK-732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13954363#comment-13954363
 ] 

ASF GitHub Bot commented on SPARK-732:
--------------------------------------

Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/228#discussion_r11094329
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -789,6 +799,25 @@ class DAGScheduler(
       }
     
       /**
    +   * detect the duplicate accumulator value and save the accumulator values
    +   * @param accumValue the accumulator values received from the task
    +   * @param stage the stage which the task belongs to
    +   * @param task the completed task
    +   */
    +  private def saveAccumulatorValue(accumValue: Map[Long, Any], stage: 
Stage, task: Task[_]) {
    +    if (accumValue != null &&
    +      (!stageIdToAccumulators.contains(stage.id) ||
    +        !stageIdToAccumulators(stage.id).contains(task.partitionId))) {
    +      stageIdToAccumulators.getOrElseUpdate(stage.id,
    +        new HashMap[Int, ListBuffer[(Long, Any)]]).
    +        getOrElseUpdate(task.partitionId, new ListBuffer[(Long, Any)])
    +      for ((id, value) <- accumValue) {
    +        stageIdToAccumulators(stage.id)(task.partitionId) += id -> value
    --- End diff --
    
    nit: you can avoid the lookup within the loop by using the value returned 
in the getOrElseUpdate calls.


> Recomputation of RDDs may result in duplicated accumulator updates
> ------------------------------------------------------------------
>
>                 Key: SPARK-732
>                 URL: https://issues.apache.org/jira/browse/SPARK-732
>             Project: Apache Spark
>          Issue Type: Bug
>    Affects Versions: 0.7.0, 0.6.2, 0.7.1, 0.8.0, 0.7.2, 0.7.3, 0.8.1, 0.9.0, 
> 0.8.2
>            Reporter: Josh Rosen
>            Assignee: Nan Zhu
>             Fix For: 1.0.0
>
>
> Currently, Spark doesn't guard against duplicated updates to the same 
> accumulator due to recomputations of an RDD.  For example:
> {code}
>     val acc = sc.accumulator(0)
>     data.map(x => acc += 1; f(x))
>     data.count()
>     // acc should equal data.count() here
>     data.foreach{...}
>     // Now, acc = 2 * data.count() because the map() was recomputed.
> {code}
> I think that this behavior is incorrect, especially because this behavior 
> allows the additon or removal of a cache() call to affect the outcome of a 
> computation.
> There's an old TODO to fix this duplicate update issue in the [DAGScheduler 
> code|https://github.com/mesos/spark/blob/ec5e553b418be43aa3f0ccc24e0d5ca9d63504b2/core/src/main/scala/spark/scheduler/DAGScheduler.scala#L494].
> I haven't tested whether recomputation due to blocks being dropped from the 
> cache can trigger duplicate accumulator updates.
> Hypothetically someone could be relying on the current behavior to implement 
> performance counters that track the actual number of computations performed 
> (including recomputations).  To be safe, we could add an explicit warning in 
> the release notes that documents the change in behavior when we fix this.
> Ignoring duplicate updates shouldn't be too hard, but there are a few 
> subtleties.  Currently, we allow accumulators to be used in multiple 
> transformations, so we'd need to detect duplicate updates at the 
> per-transformation level.  I haven't dug too deeply into the scheduler 
> internals, but we might also run into problems where pipelining causes what 
> is logically one set of accumulator updates to show up in two different tasks 
> (e.g. rdd.map(accum += x; ...) and rdd.map(accum += x; ...).count() may cause 
> what's logically the same accumulator update to be applied from two different 
> contexts, complicating the detection of duplicate updates).



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to