Till Rohrmann created FLINK-4829:
------------------------------------
Summary: Accumulators are not thread safe
Key: FLINK-4829
URL: https://issues.apache.org/jira/browse/FLINK-4829
Project: Flink
Issue Type: Bug
Components: Local Runtime
Affects Versions: 1.2.0
Reporter: Till Rohrmann
Fix For: 1.2.0
Flink's {{Accumulators}} are not thread safe. With the introduction of live
accumulator snapshots which are sent to the {{JobManager}}, we've introduced a
concurrent access to accumulators without properly guard them against
concurrent modifications. So if an accumulator snapshot is taken for an
accumulator which is at the same time modified, it can cause an
{{ConcurrentModificationException}} as it was reported by an user:
{code}
WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to
serialize accumulators for task.
java.util.ConcurrentModificationException
at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
at java.util.TreeMap.writeObject(TreeMap.java:2436)
at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
at java.util.HashMap.writeObject(HashMap.java:1362)
at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52)
at
org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58)
at
org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)