[
https://issues.apache.org/jira/browse/SPARK-732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13954363#comment-13954363
]
ASF GitHub Bot commented on SPARK-732:
--------------------------------------
Github user mridulm commented on a diff in the pull request:
https://github.com/apache/spark/pull/228#discussion_r11094329
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -789,6 +799,25 @@ class DAGScheduler(
}
/**
+ * detect the duplicate accumulator value and save the accumulator values
+ * @param accumValue the accumulator values received from the task
+ * @param stage the stage which the task belongs to
+ * @param task the completed task
+ */
+ private def saveAccumulatorValue(accumValue: Map[Long, Any], stage:
Stage, task: Task[_]) {
+ if (accumValue != null &&
+ (!stageIdToAccumulators.contains(stage.id) ||
+ !stageIdToAccumulators(stage.id).contains(task.partitionId))) {
+ stageIdToAccumulators.getOrElseUpdate(stage.id,
+ new HashMap[Int, ListBuffer[(Long, Any)]]).
+ getOrElseUpdate(task.partitionId, new ListBuffer[(Long, Any)])
+ for ((id, value) <- accumValue) {
+ stageIdToAccumulators(stage.id)(task.partitionId) += id -> value
--- End diff --
nit: you can avoid the lookup within the loop by using the value returned
in the getOrElseUpdate calls.
> Recomputation of RDDs may result in duplicated accumulator updates
> ------------------------------------------------------------------
>
> Key: SPARK-732
> URL: https://issues.apache.org/jira/browse/SPARK-732
> Project: Apache Spark
> Issue Type: Bug
> Affects Versions: 0.7.0, 0.6.2, 0.7.1, 0.8.0, 0.7.2, 0.7.3, 0.8.1, 0.9.0,
> 0.8.2
> Reporter: Josh Rosen
> Assignee: Nan Zhu
> Fix For: 1.0.0
>
>
> Currently, Spark doesn't guard against duplicated updates to the same
> accumulator due to recomputations of an RDD. For example:
> {code}
> val acc = sc.accumulator(0)
> data.map(x => acc += 1; f(x))
> data.count()
> // acc should equal data.count() here
> data.foreach{...}
> // Now, acc = 2 * data.count() because the map() was recomputed.
> {code}
> I think that this behavior is incorrect, especially because this behavior
> allows the additon or removal of a cache() call to affect the outcome of a
> computation.
> There's an old TODO to fix this duplicate update issue in the [DAGScheduler
> code|https://github.com/mesos/spark/blob/ec5e553b418be43aa3f0ccc24e0d5ca9d63504b2/core/src/main/scala/spark/scheduler/DAGScheduler.scala#L494].
> I haven't tested whether recomputation due to blocks being dropped from the
> cache can trigger duplicate accumulator updates.
> Hypothetically someone could be relying on the current behavior to implement
> performance counters that track the actual number of computations performed
> (including recomputations). To be safe, we could add an explicit warning in
> the release notes that documents the change in behavior when we fix this.
> Ignoring duplicate updates shouldn't be too hard, but there are a few
> subtleties. Currently, we allow accumulators to be used in multiple
> transformations, so we'd need to detect duplicate updates at the
> per-transformation level. I haven't dug too deeply into the scheduler
> internals, but we might also run into problems where pipelining causes what
> is logically one set of accumulator updates to show up in two different tasks
> (e.g. rdd.map(accum += x; ...) and rdd.map(accum += x; ...).count() may cause
> what's logically the same accumulator update to be applied from two different
> contexts, complicating the detection of duplicate updates).
--
This message was sent by Atlassian JIRA
(v6.2#6252)