[
https://issues.apache.org/jira/browse/SPARK-21425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088909#comment-16088909
]
Ryan Williams commented on SPARK-21425:
---------------------------------------
[~sowen] interesting, I didn't think my example was using accumulators
un-idiomatically but maybe that's the catch here.
To be clear, the
[Main|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala]
application in my repro is the interesting one;
[LongRace|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/LongRace.scala]
and
[LongAccumulatorRace|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/LongAccumulatorRace.scala]
just show that {{Long}}'s and {{LongAccumulator}}'s are not threadsafe,
respectively.
[The accumulator usage in that
Main|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala#L56-L70]
is straightforward, I thought:
{code}
sc
.parallelize(
1 to numElems,
numSlices = partitions
)
.mapPartitions {
it ⇒
otherAccum.add(1)
lazyOtherAccum.add(1)
localAccum.add(1)
lazyLocalAccum.add(1)
it.map(_.toString)
}
.collect()
{code}
It also "works" with {{RDD.map}}, incrementing the accumulators for every
element.
I thought the way to use accumulators was declare them on the driver, then when
they are referenced in task closures, a copy is made either per-task or
per-executor (the latter mirroring how {{Broadcast}}'s work).
If they are per-executor, and multiple tasks can find themselves writing to the
same accumulator instance, then this bug would seem pretty severe.
If they are per-task, then as you said everything should be fine as long as a
task doesn't spawn multiple threads that both write to the accumulator, which
seems un-idiomatic enough from a Spark-usage perspective that this would be
much less worrisome.
In either case, I'm wondering if the behavior I'm seeing is specific to the way
{{local\[*\]}}-mode lays out executors/threads under the hood. [My example uses
{{local\[4\]}}|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala#L109],
which I thought meant that my driver JVM spawns 4 threads to act as executors,
in which case I'd expect the task-closures sent to those executors to have
replaced references to accumulators with references to copies of the
accumulators, as would presumably happen in "cluster mode".
So the important outstanding questions, to me, are:
- are accumulator instances shared across tasks on an executor, like
{{Broadcast}} values, or does each task instantiate and write to a fresh copy
of each accumulator?
- does {{local\[…\]}}-mode isolate accumulator instances in the same way as
cluster-mode; if not, this bug could just be interpreted as a bug in how
accumulator-references in task-closures are handled in local-mode.
There's also a nagging question of why I only see the race in the accumulators
that are declared out in the {{Spark}} singleton; maybe the closure serializer
just picks up direct references to them and doesn't transform them into
per-task accumulator-instances like it would normally?
Thanks for having a look!
> LongAccumulator, DoubleAccumulator not threadsafe
> -------------------------------------------------
>
> Key: SPARK-21425
> URL: https://issues.apache.org/jira/browse/SPARK-21425
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.2.0
> Reporter: Ryan Williams
>
> [AccumulatorV2
> docs|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L42-L43]
> acknowledge that accumulators must be concurrent-read-safe, but afaict they
> must also be concurrent-write-safe.
> The same docs imply that {{Int}} and {{Long}} meet either/both of these
> criteria, when afaict they do not.
> Relatedly, the provided
> [LongAccumulator|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L291]
> and
> [DoubleAccumulator|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L370]
> are not thread-safe, and should be expected to behave undefinedly when
> multiple concurrent tasks on the same executor write to them.
> [Here is a repro repo|https://github.com/ryan-williams/spark-bugs/tree/accum]
> with some simple applications that demonstrate incorrect results from
> {{LongAccumulator}}'s.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]