Repository: flink Updated Branches: refs/heads/release-1.1 fe464b424 -> 210230c4a
[FLINK-4829] protect user accumulators against concurrent updates Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1d6b246 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1d6b246 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1d6b246 Branch: refs/heads/release-1.1 Commit: c1d6b24600e40700fa06caa28bc81788d8e92386 Parents: fe464b4 Author: Maximilian Michels <m...@apache.org> Authored: Fri Oct 14 15:15:50 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Tue Oct 18 16:05:00 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/runtime/accumulators/AccumulatorRegistry.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c1d6b246/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java index 41af2a9..44714e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** @@ -44,7 +45,8 @@ public class AccumulatorRegistry { new HashMap<Metric, Accumulator<?, ?>>(); /* User-defined Accumulator values stored for the executing task. */ - private final Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>(); + private final Map<String, Accumulator<?, ?>> userAccumulators = + new ConcurrentHashMap<>(4); /* The reporter reference that is handed to the reporting tasks. */ private final ReadWriteReporter reporter;