[ 
https://issues.apache.org/jira/browse/SPARK-732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13954384#comment-13954384
 ] 

ASF GitHub Bot commented on SPARK-732:
--------------------------------------

Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/228#issuecomment-39004325
  
    Can we just add the accumulator update to TaskSetManager, in the 
handleSuccessfulTask() method?  This seems much simpler because the 
TaskSetManager already has all of the state about which tasks are running, 
which ones have been resubmitted or speculated, etc.  I think this change would 
be much simpler.
    
    Over time, a lot of functionality has leaked into the DAGScheduler, such 
that there's a lot of state that's kept in multiple places: in the DAGScheduler 
and in the TaskSetManager or the TaskSchedulerImpl.  The abstraction is 
supposed to be that the DAGScheduler handles the high level semantics of 
scheduling stages and dealing with inter-stage dependencies, and the 
TaskSetManager handles the low-level details of the tasks for each stage.  
There are some parts of this abstraction that are currently broken (where the 
DAGScheduler knows too much about task-level details) and refactoring this is 
on my todo list, but in the meantime I think we should try not to make this 
problem any worse, because it makes the code much more complicated, more 
difficult to understand, and buggy.


> Recomputation of RDDs may result in duplicated accumulator updates
> ------------------------------------------------------------------
>
>                 Key: SPARK-732
>                 URL: https://issues.apache.org/jira/browse/SPARK-732
>             Project: Apache Spark
>          Issue Type: Bug
>    Affects Versions: 0.7.0, 0.6.2, 0.7.1, 0.8.0, 0.7.2, 0.7.3, 0.8.1, 0.9.0, 
> 0.8.2
>            Reporter: Josh Rosen
>            Assignee: Nan Zhu
>             Fix For: 1.0.0
>
>
> Currently, Spark doesn't guard against duplicated updates to the same 
> accumulator due to recomputations of an RDD.  For example:
> {code}
>     val acc = sc.accumulator(0)
>     data.map(x => acc += 1; f(x))
>     data.count()
>     // acc should equal data.count() here
>     data.foreach{...}
>     // Now, acc = 2 * data.count() because the map() was recomputed.
> {code}
> I think that this behavior is incorrect, especially because this behavior 
> allows the additon or removal of a cache() call to affect the outcome of a 
> computation.
> There's an old TODO to fix this duplicate update issue in the [DAGScheduler 
> code|https://github.com/mesos/spark/blob/ec5e553b418be43aa3f0ccc24e0d5ca9d63504b2/core/src/main/scala/spark/scheduler/DAGScheduler.scala#L494].
> I haven't tested whether recomputation due to blocks being dropped from the 
> cache can trigger duplicate accumulator updates.
> Hypothetically someone could be relying on the current behavior to implement 
> performance counters that track the actual number of computations performed 
> (including recomputations).  To be safe, we could add an explicit warning in 
> the release notes that documents the change in behavior when we fix this.
> Ignoring duplicate updates shouldn't be too hard, but there are a few 
> subtleties.  Currently, we allow accumulators to be used in multiple 
> transformations, so we'd need to detect duplicate updates at the 
> per-transformation level.  I haven't dug too deeply into the scheduler 
> internals, but we might also run into problems where pipelining causes what 
> is logically one set of accumulator updates to show up in two different tasks 
> (e.g. rdd.map(accum += x; ...) and rdd.map(accum += x; ...).count() may cause 
> what's logically the same accumulator update to be applied from two different 
> contexts, complicating the detection of duplicate updates).



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to