[ 
https://issues.apache.org/jira/browse/SPARK-21425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088909#comment-16088909
 ] 

Ryan Williams edited comment on SPARK-21425 at 7/16/17 12:25 PM:
-----------------------------------------------------------------

[~sowen] interesting, I didn't think my example was using accumulators 
un-idiomatically but maybe that's the catch here. 

To be clear, the 
[Main|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala]
 application in my repro is the interesting one; 
[LongRace|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/LongRace.scala]
 and 
[LongAccumulatorRace|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/LongAccumulatorRace.scala]
 just confirm that {{Long}}'s and {{LongAccumulator}}'s are not 
concurrent-write-safe, respectively.

[The accumulator usage in that 
{{Main}}|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala#L56-L70]
 is straightforward, I thought:

{code}
sc
  .parallelize(
    1 to numElems,
    numSlices = partitions
  )
  .mapPartitions {
    it ⇒
      otherAccum.add(1)
      lazyOtherAccum.add(1)
      localAccum.add(1)
      lazyLocalAccum.add(1)

      it.map(_.toString)
  }
  .collect()
{code}

It also "works" with {{RDD.map}}, incrementing the accumulators for every 
element.

I thought the way to use accumulators was:

- declare them on the driver,
- then when they are referenced in task closures, a copy is made, either 
per-task or per-executor (the latter mirroring how {{Broadcast}}'s work).

If they are per-executor, and multiple tasks can find themselves writing to the 
same accumulator instance, then this bug would seem pretty severe.

If they are per-task, then as you said everything should be fine as long as a 
task doesn't spawn multiple threads that both write to the accumulator, which 
seems un-idiomatic enough from a Spark-usage perspective that this would be 
much less worrisome.

In either case, I'm wondering if the behavior I'm seeing is specific to the way 
{{local\[*\]}}-mode lays out executors/threads under the hood. [My example uses 
{{local\[4\]}}|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala#L109],
 which I thought meant that my driver JVM spawns 4 threads to act as executors, 
in which case I'd expect the task-closures sent to those executors to have 
replaced references to accumulators with references to copies of the 
accumulators, as would presumably happen in "cluster mode".

So the important outstanding questions, to me, are:

- are accumulator instances shared across tasks on an executor, like 
{{Broadcast}} values, or does each task instantiate and write to a fresh copy 
of each accumulator?
- does {{local\[…\]}}-mode isolate accumulator instances in the same way as 
cluster-mode; if not, this bug could just be interpreted as a bug in how 
accumulator-references in task-closures are handled in local-mode.

There's also a nagging question of why I only see the race in the accumulators 
that are declared out in the {{Spark}} singleton; maybe the closure serializer 
just picks up direct references to them and doesn't transform them into 
per-task accumulator-instances like it would normally?

Thanks for having a look!


was (Author: rdub):
[~sowen] interesting, I didn't think my example was using accumulators 
un-idiomatically but maybe that's the catch here. 

To be clear, the 
[Main|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala]
 application in my repro is the interesting one; 
[LongRace|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/LongRace.scala]
 and 
[LongAccumulatorRace|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/LongAccumulatorRace.scala]
 just show that {{Long}}'s and {{LongAccumulator}}'s are not threadsafe, 
respectively.

[The accumulator usage in that 
Main|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala#L56-L70]
 is straightforward, I thought:

{code}
sc
  .parallelize(
    1 to numElems,
    numSlices = partitions
  )
  .mapPartitions {
    it ⇒
      otherAccum.add(1)
      lazyOtherAccum.add(1)
      localAccum.add(1)
      lazyLocalAccum.add(1)

      it.map(_.toString)
  }
  .collect()
{code}

It also "works" with {{RDD.map}}, incrementing the accumulators for every 
element.

I thought the way to use accumulators was declare them on the driver, then when 
they are referenced in task closures, a copy is made either per-task or 
per-executor (the latter mirroring how {{Broadcast}}'s work).

If they are per-executor, and multiple tasks can find themselves writing to the 
same accumulator instance, then this bug would seem pretty severe.

If they are per-task, then as you said everything should be fine as long as a 
task doesn't spawn multiple threads that both write to the accumulator, which 
seems un-idiomatic enough from a Spark-usage perspective that this would be 
much less worrisome.

In either case, I'm wondering if the behavior I'm seeing is specific to the way 
{{local\[*\]}}-mode lays out executors/threads under the hood. [My example uses 
{{local\[4\]}}|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala#L109],
 which I thought meant that my driver JVM spawns 4 threads to act as executors, 
in which case I'd expect the task-closures sent to those executors to have 
replaced references to accumulators with references to copies of the 
accumulators, as would presumably happen in "cluster mode".

So the important outstanding questions, to me, are:

- are accumulator instances shared across tasks on an executor, like 
{{Broadcast}} values, or does each task instantiate and write to a fresh copy 
of each accumulator?
- does {{local\[…\]}}-mode isolate accumulator instances in the same way as 
cluster-mode; if not, this bug could just be interpreted as a bug in how 
accumulator-references in task-closures are handled in local-mode.

There's also a nagging question of why I only see the race in the accumulators 
that are declared out in the {{Spark}} singleton; maybe the closure serializer 
just picks up direct references to them and doesn't transform them into 
per-task accumulator-instances like it would normally?

Thanks for having a look!

> LongAccumulator, DoubleAccumulator not threadsafe
> -------------------------------------------------
>
>                 Key: SPARK-21425
>                 URL: https://issues.apache.org/jira/browse/SPARK-21425
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Ryan Williams
>
> [AccumulatorV2 
> docs|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L42-L43]
>  acknowledge that accumulators must be concurrent-read-safe, but afaict they 
> must also be concurrent-write-safe.
> The same docs imply that {{Int}} and {{Long}} meet either/both of these 
> criteria, when afaict they do not.
> Relatedly, the provided 
> [LongAccumulator|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L291]
>  and 
> [DoubleAccumulator|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L370]
>  are not thread-safe, and should be expected to behave undefinedly when 
> multiple concurrent tasks on the same executor write to them.
> [Here is a repro repo|https://github.com/ryan-williams/spark-bugs/tree/accum] 
> with some simple applications that demonstrate incorrect results from 
> {{LongAccumulator}}'s.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to