[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15575439#comment-15575439
 ] 

Maximilian Michels commented on FLINK-4829:
-------------------------------------------

To fix the problem, we will have to introduce a lock in the Accumulator base 
class which has to be acquired upon adding values. That seems to be the only 
way to make this work for custom accumulators.

Note that this concerns only live accumulators. Accumulators will still be 
reported when the job is over. 

> Accumulators are not thread safe
> --------------------------------
>
>                 Key: FLINK-4829
>                 URL: https://issues.apache.org/jira/browse/FLINK-4829
>             Project: Flink
>          Issue Type: Bug
>          Components: Local Runtime
>    Affects Versions: 1.2.0
>            Reporter: Till Rohrmann
>            Assignee: Maximilian Michels
>             Fix For: 1.2.0
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry     - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
>         at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
>         at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
>         at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
>         at java.util.TreeMap.writeObject(TreeMap.java:2436)
>         at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>         at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
>         at java.util.HashMap.writeObject(HashMap.java:1362)
>         at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>         at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
>         at 
> org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52)
>         at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58)
>         at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
>         at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to