Repository: flink
Updated Branches:
  refs/heads/release-1.1 fe464b424 -> 210230c4a


[FLINK-4829] protect user accumulators against concurrent updates


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1d6b246
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1d6b246
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1d6b246

Branch: refs/heads/release-1.1
Commit: c1d6b24600e40700fa06caa28bc81788d8e92386
Parents: fe464b4
Author: Maximilian Michels <m...@apache.org>
Authored: Fri Oct 14 15:15:50 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Tue Oct 18 16:05:00 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/accumulators/AccumulatorRegistry.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c1d6b246/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
index 41af2a9..44714e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
@@ -44,7 +45,8 @@ public class AccumulatorRegistry {
                        new HashMap<Metric, Accumulator<?, ?>>();
 
        /* User-defined Accumulator values stored for the executing task. */
-       private final Map<String, Accumulator<?, ?>> userAccumulators = new 
HashMap<>();
+       private final Map<String, Accumulator<?, ?>> userAccumulators =
+                       new ConcurrentHashMap<>(4);
 
        /* The reporter reference that is handed to the reporting tasks. */
        private final ReadWriteReporter reporter;

Reply via email to