Repository: flink
Updated Branches:
  refs/heads/master 68f446c9f -> f622de3ec


[FLINK-7368][metrics] Make MetricStore ThreadSafe class

Remove external synchronization on MetricStore

This closes #4472.
This closes #4840.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f622de3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f622de3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f622de3e

Branch: refs/heads/master
Commit: f622de3ecbc2ae17f2d15fe46828c48747c2b6ae
Parents: 68f446c
Author: Piotr Nowojski <[email protected]>
Authored: Mon Oct 16 16:53:14 2017 +0200
Committer: zentol <[email protected]>
Committed: Thu Oct 26 16:41:57 2017 +0200

----------------------------------------------------------------------
 .../handler/legacy/TaskManagersHandler.java     | 123 ++++----
 .../legacy/metrics/AbstractMetricsHandler.java  |  76 +++--
 .../metrics/JobManagerMetricsHandler.java       |   2 +-
 .../legacy/metrics/JobMetricsHandler.java       |   2 +-
 .../legacy/metrics/JobVertexMetricsHandler.java |   2 +-
 .../handler/legacy/metrics/MetricFetcher.java   |  22 +-
 .../handler/legacy/metrics/MetricStore.java     | 297 +++++++++++--------
 .../metrics/TaskManagerMetricsHandler.java      |   2 +-
 .../rest/handler/util/MutableIOMetrics.java     |  88 +++---
 .../legacy/metrics/MetricFetcherTest.java       |  32 +-
 .../handler/legacy/metrics/MetricStoreTest.java |   6 +-
 11 files changed, 340 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
index e608b99..93c5b44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
@@ -130,71 +130,68 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
                        // only send metrics when only one task manager 
requests them.
                        if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
                                fetcher.update();
-                               final MetricStore metricStore = 
fetcher.getMetricStore();
-
-                               synchronized (metricStore) {
-                                       MetricStore.TaskManagerMetricStore 
metrics = metricStore.getTaskManagerMetricStore(instance.getId().toString());
-                                       if (metrics != null) {
-                                               
gen.writeObjectFieldStart("metrics");
-                                               long heapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
-                                               long heapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
-                                               long heapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
-                                               
gen.writeNumberField("heapCommitted", heapCommitted);
-                                               
gen.writeNumberField("heapUsed", heapUsed);
-                                               gen.writeNumberField("heapMax", 
heapTotal);
-
-                                               long nonHeapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
-                                               long nonHeapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
-                                               long nonHeapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
-                                               
gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
-                                               
gen.writeNumberField("nonHeapUsed", nonHeapUsed);
-                                               
gen.writeNumberField("nonHeapMax", nonHeapTotal);
-
-                                               
gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
-                                               
gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
-                                               
gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
-
-                                               long directCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
-                                               long directUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
-                                               long directMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
-                                               
gen.writeNumberField("directCount", directCount);
-                                               
gen.writeNumberField("directUsed", directUsed);
-                                               
gen.writeNumberField("directMax", directMax);
-
-                                               long mappedCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
-                                               long mappedUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
-                                               long mappedMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
-                                               
gen.writeNumberField("mappedCount", mappedCount);
-                                               
gen.writeNumberField("mappedUsed", mappedUsed);
-                                               
gen.writeNumberField("mappedMax", mappedMax);
-
-                                               long memorySegmentsAvailable = 
Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
-                                               long memorySegmentsTotal = 
Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
-                                               
gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
-                                               
gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
-                                               
gen.writeArrayFieldStart("garbageCollectors");
-
-                                               for (String gcName : 
metrics.garbageCollectorNames) {
-                                                       String count = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
-                                                       String time = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
-                                                       if (count != null && 
time != null) {
-                                                               
gen.writeStartObject();
-                                                               
gen.writeStringField("name", gcName);
-                                                               
gen.writeNumberField("count", Long.valueOf(count));
-                                                               
gen.writeNumberField("time", Long.valueOf(time));
-                                                               
gen.writeEndObject();
-                                                       }
-                                               }
 
-                                               gen.writeEndArray();
-                                               gen.writeEndObject();
+                               MetricStore.TaskManagerMetricStore metrics = 
fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
+                               if (metrics != null) {
+                                       gen.writeObjectFieldStart("metrics");
+                                       long heapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+                                       long heapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+                                       long heapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+                                       gen.writeNumberField("heapCommitted", 
heapCommitted);
+                                       gen.writeNumberField("heapUsed", 
heapUsed);
+                                       gen.writeNumberField("heapMax", 
heapTotal);
+
+                                       long nonHeapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+                                       long nonHeapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+                                       long nonHeapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+                                       
gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+                                       gen.writeNumberField("nonHeapUsed", 
nonHeapUsed);
+                                       gen.writeNumberField("nonHeapMax", 
nonHeapTotal);
+
+                                       gen.writeNumberField("totalCommitted", 
heapCommitted + nonHeapCommitted);
+                                       gen.writeNumberField("totalUsed", 
heapUsed + nonHeapUsed);
+                                       gen.writeNumberField("totalMax", 
heapTotal + nonHeapTotal);
+
+                                       long directCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+                                       long directUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+                                       long directMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+                                       gen.writeNumberField("directCount", 
directCount);
+                                       gen.writeNumberField("directUsed", 
directUsed);
+                                       gen.writeNumberField("directMax", 
directMax);
+
+                                       long mappedCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+                                       long mappedUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+                                       long mappedMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+                                       gen.writeNumberField("mappedCount", 
mappedCount);
+                                       gen.writeNumberField("mappedUsed", 
mappedUsed);
+                                       gen.writeNumberField("mappedMax", 
mappedMax);
+
+                                       long memorySegmentsAvailable = 
Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+                                       long memorySegmentsTotal = 
Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+                                       
gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+                                       
gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+                                       
gen.writeArrayFieldStart("garbageCollectors");
+
+                                       for (String gcName : 
metrics.garbageCollectorNames) {
+                                               String count = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+                                               String time = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+                                               if (count != null && time != 
null) {
+                                                       gen.writeStartObject();
+                                                       
gen.writeStringField("name", gcName);
+                                                       
gen.writeNumberField("count", Long.valueOf(count));
+                                                       
gen.writeNumberField("time", Long.valueOf(time));
+                                                       gen.writeEndObject();
+                                               }
                                        }
+
+                                       gen.writeEndArray();
+                                       gen.writeEndObject();
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
index 6cf83c4..186397b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
@@ -87,54 +87,48 @@ public abstract class AbstractMetricsHandler extends 
AbstractJsonRequestHandler
                         */
                        return "";
                }
-               MetricStore metricStore = fetcher.getMetricStore();
-               synchronized (metricStore) {
-                       Map<String, String> metrics = getMapFor(pathParams, 
metricStore);
-                       if (metrics == null) {
-                               return "";
-                       }
-                       String[] requestedMetrics = 
requestedMetricsList.split(",");
-
-                       StringWriter writer = new StringWriter();
-                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-                       gen.writeStartArray();
-                       for (String requestedMetric : requestedMetrics) {
-                               Object metricValue = 
metrics.get(requestedMetric);
-                               if (metricValue != null) {
-                                       gen.writeStartObject();
-                                       gen.writeStringField("id", 
requestedMetric);
-                                       gen.writeStringField("value", 
metricValue.toString());
-                                       gen.writeEndObject();
-                               }
-                       }
-                       gen.writeEndArray();
-
-                       gen.close();
-                       return writer.toString();
+               Map<String, String> metrics = getMapFor(pathParams, 
fetcher.getMetricStore());
+               if (metrics == null) {
+                       return "";
                }
-       }
+               String[] requestedMetrics = requestedMetricsList.split(",");
 
-       private String getAvailableMetricsList(Map<String, String> pathParams) 
throws IOException {
-               MetricStore metricStore = fetcher.getMetricStore();
-               synchronized (metricStore) {
-                       Map<String, String> metrics = getMapFor(pathParams, 
metricStore);
-                       if (metrics == null) {
-                               return "";
-                       }
-                       StringWriter writer = new StringWriter();
-                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
-                       gen.writeStartArray();
-                       for (String m : metrics.keySet()) {
+               gen.writeStartArray();
+               for (String requestedMetric : requestedMetrics) {
+                       Object metricValue = metrics.get(requestedMetric);
+                       if (metricValue != null) {
                                gen.writeStartObject();
-                               gen.writeStringField("id", m);
+                               gen.writeStringField("id", requestedMetric);
+                               gen.writeStringField("value", 
metricValue.toString());
                                gen.writeEndObject();
                        }
-                       gen.writeEndArray();
+               }
+               gen.writeEndArray();
+
+               gen.close();
+               return writer.toString();
+       }
 
-                       gen.close();
-                       return writer.toString();
+       private String getAvailableMetricsList(Map<String, String> pathParams) 
throws IOException {
+               Map<String, String> metrics = getMapFor(pathParams, 
fetcher.getMetricStore());
+               if (metrics == null) {
+                       return "";
+               }
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               gen.writeStartArray();
+               for (String m : metrics.keySet()) {
+                       gen.writeStartObject();
+                       gen.writeStringField("id", m);
+                       gen.writeEndObject();
                }
+               gen.writeEndArray();
+
+               gen.close();
+               return writer.toString();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
index c568ee0..35a4efd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
@@ -47,7 +47,7 @@ public class JobManagerMetricsHandler extends 
AbstractMetricsHandler {
 
        @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
-               MetricStore.JobManagerMetricStore jobManager = 
metrics.getJobManagerMetricStore();
+               MetricStore.ComponentMetricStore jobManager = 
metrics.getJobManagerMetricStore();
                if (jobManager == null) {
                        return null;
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
index 7341eb8..34e7b87 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
@@ -47,7 +47,7 @@ public class JobMetricsHandler extends AbstractMetricsHandler 
{
 
        @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
-               MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
+               MetricStore.ComponentMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
                return job != null
                        ? job.metrics
                        : null;

http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
index 3a701ab..5035645 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
@@ -47,7 +47,7 @@ public class JobVertexMetricsHandler extends 
AbstractMetricsHandler {
 
        @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
-               MetricStore.TaskMetricStore task = metrics.getTaskMetricStore(
+               MetricStore.ComponentMetricStore task = 
metrics.getTaskMetricStore(
                        pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID),
                        pathParams.get(PARAMETER_VERTEX_ID));
                return task != null

http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index c114ee6..fa71c68 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -112,16 +111,14 @@ public class MetricFetcher {
                                                if (throwable != null) {
                                                        LOG.debug("Fetching of 
JobDetails failed.", throwable);
                                                } else {
-                                                       ArrayList<String> 
toRetain = new ArrayList<>();
+                                                       ArrayList<String> 
activeJobs = new ArrayList<>();
                                                        for (JobDetails job : 
jobDetails.getRunning()) {
-                                                               
toRetain.add(job.getJobId().toString());
+                                                               
activeJobs.add(job.getJobId().toString());
                                                        }
                                                        for (JobDetails job : 
jobDetails.getFinished()) {
-                                                               
toRetain.add(job.getJobId().toString());
-                                                       }
-                                                       synchronized (metrics) {
-                                                               
metrics.jobs.keySet().retainAll(toRetain);
+                                                               
activeJobs.add(job.getJobId().toString());
                                                        }
+                                                       
metrics.retainJobs(activeJobs);
                                                }
                                        },
                                        executor);
@@ -154,9 +151,7 @@ public class MetricFetcher {
                                                                        return 
taskManagerInstance.getId().toString();
                                                                
}).collect(Collectors.toList());
 
-                                                       synchronized (metrics) {
-                                                               
metrics.taskManagers.keySet().retainAll(activeTaskManagers);
-                                                       }
+                                                       
metrics.retainTaskManagers(activeTaskManagers);
                                                }
                                        },
                                        executor);
@@ -198,12 +193,7 @@ public class MetricFetcher {
                                        if (t != null) {
                                                LOG.debug("Fetching metrics 
failed.", t);
                                        } else {
-                                               List<MetricDump> dumpedMetrics 
= deserializer.deserialize(result);
-                                               synchronized (metrics) {
-                                                       for (MetricDump metric 
: dumpedMetrics) {
-                                                               
metrics.add(metric);
-                                                       }
-                                               }
+                                               
metrics.addAll(deserializer.deserialize(result));
                                        }
                                },
                                executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 9c13ab8..f9e79d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -18,17 +18,22 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.HashSet;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSet;
 import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
 import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
 import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
@@ -38,29 +43,136 @@ import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY
 import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
 import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
 import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Nested data-structure to store metrics.
- *
- * <p>This structure is not thread-safe.
  */
+@ThreadSafe
 public class MetricStore {
        private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
 
-       final JobManagerMetricStore jobManager = new JobManagerMetricStore();
-       final Map<String, TaskManagerMetricStore> taskManagers = new 
HashMap<>();
-       final Map<String, JobMetricStore> jobs = new HashMap<>();
+       private final ComponentMetricStore jobManager = new 
ComponentMetricStore();
+       private final Map<String, TaskManagerMetricStore> taskManagers = new 
ConcurrentHashMap<>();
+       private final Map<String, JobMetricStore> jobs = new 
ConcurrentHashMap<>();
+
+       /**
+        * Remove inactive task managers.
+        *
+        * @param activeTaskManagers to retain.
+        */
+       synchronized void retainTaskManagers(List<String> activeTaskManagers) {
+               taskManagers.keySet().retainAll(activeTaskManagers);
+       }
+
+       /**
+        * Remove inactive jobs..
+        *
+        * @param activeJobs to retain.
+        */
+       synchronized void retainJobs(List<String> activeJobs) {
+               jobs.keySet().retainAll(activeJobs);
+       }
+
+       /**
+        * Add metric dumps to the store.
+        *
+        * @param metricDumps to add.
+        */
+       synchronized void addAll(List<MetricDump> metricDumps) {
+               for (MetricDump metric : metricDumps) {
+                       add(metric);
+               }
+       }
 
        // 
-----------------------------------------------------------------------------------------------------------------
-       // Adding metrics
+       // Accessors for sub MetricStores
        // 
-----------------------------------------------------------------------------------------------------------------
-       public void add(MetricDump metric) {
+
+       /**
+        * Returns the {@link ComponentMetricStore} for the JobManager.
+        *
+        * @return ComponentMetricStore for the JobManager
+        */
+       public synchronized ComponentMetricStore getJobManagerMetricStore() {
+               return ComponentMetricStore.unmodifiable(jobManager);
+       }
+
+       /**
+        * Returns the {@link TaskManagerMetricStore} for the given taskmanager 
ID.
+        *
+        * @param tmID taskmanager ID
+        * @return TaskManagerMetricStore for the given ID, or null if no store 
for the given argument exists
+        */
+       public synchronized TaskManagerMetricStore 
getTaskManagerMetricStore(String tmID) {
+               return tmID == null ? null : 
TaskManagerMetricStore.unmodifiable(taskManagers.get(tmID));
+       }
+
+       /**
+        * Returns the {@link ComponentMetricStore} for the given job ID.
+        *
+        * @param jobID job ID
+        * @return ComponentMetricStore for the given ID, or null if no store 
for the given argument exists
+        */
+       public synchronized ComponentMetricStore getJobMetricStore(String 
jobID) {
+               return jobID == null ? null : 
ComponentMetricStore.unmodifiable(jobs.get(jobID));
+       }
+
+       /**
+        * Returns the {@link ComponentMetricStore} for the given job/task ID.
+        *
+        * @param jobID  job ID
+        * @param taskID task ID
+        * @return ComponentMetricStore for given IDs, or null if no store for 
the given arguments exists
+        */
+       public synchronized ComponentMetricStore getTaskMetricStore(String 
jobID, String taskID) {
+               JobMetricStore job = jobID == null ? null : jobs.get(jobID);
+               if (job == null || taskID == null) {
+                       return null;
+               }
+               return 
ComponentMetricStore.unmodifiable(job.getTaskMetricStore(taskID));
+       }
+
+       /**
+        * Returns the {@link ComponentMetricStore} for the given job/task ID 
and subtask index.
+        *
+        * @param jobID        job ID
+        * @param taskID       task ID
+        * @param subtaskIndex subtask index
+        * @return SubtaskMetricStore for the given IDs and index, or null if 
no store for the given arguments exists
+        */
+       public synchronized ComponentMetricStore getSubtaskMetricStore(String 
jobID, String taskID, int subtaskIndex) {
+               JobMetricStore job = jobID == null ? null : jobs.get(jobID);
+               if (job == null) {
+                       return null;
+               }
+               TaskMetricStore task = job.getTaskMetricStore(taskID);
+               if (task == null) {
+                       return null;
+               }
+               return 
ComponentMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
+       }
+
+       public synchronized Map<String, JobMetricStore> getJobs() {
+               return unmodifiableMap(jobs);
+       }
+
+       public synchronized Map<String, TaskManagerMetricStore> 
getTaskManagers() {
+               return unmodifiableMap(taskManagers);
+       }
+
+       public synchronized ComponentMetricStore getJobManager() {
+               return ComponentMetricStore.unmodifiable(jobManager);
+       }
+
+       @VisibleForTesting
+       void add(MetricDump metric) {
                try {
                        QueryScopeInfo info = metric.scopeInfo;
                        TaskManagerMetricStore tm;
                        JobMetricStore job;
                        TaskMetricStore task;
-                       SubtaskMetricStore subtask;
+                       ComponentMetricStore subtask;
 
                        String name = info.scope.isEmpty()
                                ? metric.name
@@ -76,11 +188,7 @@ public class MetricStore {
                                        break;
                                case INFO_CATEGORY_TM:
                                        String tmID = 
((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
-                                       tm = taskManagers.get(tmID);
-                                       if (tm == null) {
-                                               tm = new 
TaskManagerMetricStore();
-                                               taskManagers.put(tmID, tm);
-                                       }
+                                       tm = taskManagers.computeIfAbsent(tmID, 
k -> new TaskManagerMetricStore());
                                        if (name.contains("GarbageCollector")) {
                                                String gcName = 
name.substring("Status.JVM.GarbageCollector.".length(), name.lastIndexOf('.'));
                                                
tm.addGarbageCollectorName(gcName);
@@ -89,30 +197,14 @@ public class MetricStore {
                                        break;
                                case INFO_CATEGORY_JOB:
                                        QueryScopeInfo.JobQueryScopeInfo 
jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
-                                       job = jobs.get(jobInfo.jobID);
-                                       if (job == null) {
-                                               job = new JobMetricStore();
-                                               jobs.put(jobInfo.jobID, job);
-                                       }
+                                       job = 
jobs.computeIfAbsent(jobInfo.jobID, k -> new JobMetricStore());
                                        addMetric(job.metrics, name, metric);
                                        break;
                                case INFO_CATEGORY_TASK:
                                        QueryScopeInfo.TaskQueryScopeInfo 
taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
-                                       job = jobs.get(taskInfo.jobID);
-                                       if (job == null) {
-                                               job = new JobMetricStore();
-                                               jobs.put(taskInfo.jobID, job);
-                                       }
-                                       task = job.tasks.get(taskInfo.vertexID);
-                                       if (task == null) {
-                                               task = new TaskMetricStore();
-                                               
job.tasks.put(taskInfo.vertexID, task);
-                                       }
-                                       subtask = 
task.subtasks.get(taskInfo.subtaskIndex);
-                                       if (subtask == null) {
-                                               subtask = new 
SubtaskMetricStore();
-                                               
task.subtasks.put(taskInfo.subtaskIndex, subtask);
-                                       }
+                                       job = 
jobs.computeIfAbsent(taskInfo.jobID, k -> new JobMetricStore());
+                                       task = 
job.tasks.computeIfAbsent(taskInfo.vertexID, k -> new TaskMetricStore());
+                                       subtask = 
task.subtasks.computeIfAbsent(taskInfo.subtaskIndex, k -> new 
ComponentMetricStore());
                                        /**
                                         * The duplication is intended. Metrics 
scoped by subtask are useful for several job/task handlers,
                                         * while the WebInterface task metric 
queries currently do not account for subtasks, so we don't
@@ -124,16 +216,8 @@ public class MetricStore {
                                        break;
                                case INFO_CATEGORY_OPERATOR:
                                        QueryScopeInfo.OperatorQueryScopeInfo 
operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
-                                       job = jobs.get(operatorInfo.jobID);
-                                       if (job == null) {
-                                               job = new JobMetricStore();
-                                               jobs.put(operatorInfo.jobID, 
job);
-                                       }
-                                       task = 
job.tasks.get(operatorInfo.vertexID);
-                                       if (task == null) {
-                                               task = new TaskMetricStore();
-                                               
job.tasks.put(operatorInfo.vertexID, task);
-                                       }
+                                       job = 
jobs.computeIfAbsent(operatorInfo.jobID, k -> new JobMetricStore());
+                                       task = 
job.tasks.computeIfAbsent(operatorInfo.vertexID, k -> new TaskMetricStore());
                                        /**
                                         * As the WebInterface does not account 
for operators (because it can't) we don't
                                         * divide by operator and instead use 
the concatenation of subtask index, operator name and metric name
@@ -181,74 +265,23 @@ public class MetricStore {
        }
 
        // 
-----------------------------------------------------------------------------------------------------------------
-       // Accessors for sub MetricStores
+       // sub MetricStore classes
        // 
-----------------------------------------------------------------------------------------------------------------
 
        /**
-        * Returns the {@link JobManagerMetricStore}.
-        *
-        * @return JobManagerMetricStore
+        * Structure containing metrics of a single component.
         */
-       public JobManagerMetricStore getJobManagerMetricStore() {
-               return jobManager;
-       }
+       @ThreadSafe
+       public static class ComponentMetricStore {
+               public final Map<String, String> metrics;
 
-       /**
-        * Returns the {@link TaskManagerMetricStore} for the given taskmanager 
ID.
-        *
-        * @param tmID taskmanager ID
-        * @return TaskManagerMetricStore for the given ID, or null if no store 
for the given argument exists
-        */
-       public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) {
-               return taskManagers.get(tmID);
-       }
-
-       /**
-        * Returns the {@link JobMetricStore} for the given job ID.
-        *
-        * @param jobID job ID
-        * @return JobMetricStore for the given ID, or null if no store for the 
given argument exists
-        */
-       public JobMetricStore getJobMetricStore(String jobID) {
-               return jobs.get(jobID);
-       }
-
-       /**
-        * Returns the {@link TaskMetricStore} for the given job/task ID.
-        *
-        * @param jobID  job ID
-        * @param taskID task ID
-        * @return TaskMetricStore for given IDs, or null if no store for the 
given arguments exists
-        */
-       public TaskMetricStore getTaskMetricStore(String jobID, String taskID) {
-               JobMetricStore job = getJobMetricStore(jobID);
-               if (job == null) {
-                       return null;
+               private ComponentMetricStore() {
+                       this(new ConcurrentHashMap<>());
                }
-               return job.getTaskMetricStore(taskID);
-       }
 
-       /**
-        * Returns the {@link SubtaskMetricStore} for the given job/task ID and 
subtask index.
-        *
-        * @param jobID        job ID
-        * @param taskID       task ID
-        * @param subtaskIndex subtask index
-        * @return SubtaskMetricStore for the given IDs and index, or null if 
no store for the given arguments exists
-        */
-       public SubtaskMetricStore getSubtaskMetricStore(String jobID, String 
taskID, int subtaskIndex) {
-               TaskMetricStore task = getTaskMetricStore(jobID, taskID);
-               if (task == null) {
-                       return null;
+               private ComponentMetricStore(Map<String, String> metrics) {
+                       this.metrics = checkNotNull(metrics);
                }
-               return task.getSubtaskMetricStore(subtaskIndex);
-       }
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-       // sub MetricStore classes
-       // 
-----------------------------------------------------------------------------------------------------------------
-       private abstract static class ComponentMetricStore {
-               public final Map<String, String> metrics = new HashMap<>();
 
                public String getMetric(String name) {
                        return this.metrics.get(name);
@@ -260,50 +293,66 @@ public class MetricStore {
                                ? value
                                : defaultValue;
                }
-       }
 
-       /**
-        * Sub-structure containing metrics of the JobManager.
-        */
-       public static class JobManagerMetricStore extends ComponentMetricStore {
+               private static ComponentMetricStore 
unmodifiable(ComponentMetricStore source) {
+                       if (source == null) {
+                               return null;
+                       }
+                       return new 
ComponentMetricStore(unmodifiableMap(source.metrics));
+               }
        }
 
        /**
         * Sub-structure containing metrics of a single TaskManager.
         */
+       @ThreadSafe
        public static class TaskManagerMetricStore extends ComponentMetricStore 
{
-               public final Set<String> garbageCollectorNames = new 
HashSet<>();
+               public final Set<String> garbageCollectorNames;
+
+               private TaskManagerMetricStore() {
+                       this(new ConcurrentHashMap<>(), 
ConcurrentHashMap.newKeySet());
+               }
+
+               private TaskManagerMetricStore(Map<String, String> metrics, 
Set<String> garbageCollectorNames) {
+                       super(metrics);
+                       this.garbageCollectorNames = 
checkNotNull(garbageCollectorNames);
+               }
 
-               public void addGarbageCollectorName(String name) {
+               private void addGarbageCollectorName(String name) {
                        garbageCollectorNames.add(name);
                }
+
+               private static TaskManagerMetricStore 
unmodifiable(TaskManagerMetricStore source) {
+                       if (source == null) {
+                               return null;
+                       }
+                       return new TaskManagerMetricStore(
+                               unmodifiableMap(source.metrics),
+                               unmodifiableSet(source.garbageCollectorNames));
+               }
        }
 
        /**
         * Sub-structure containing metrics of a single Job.
         */
-       public static class JobMetricStore extends ComponentMetricStore {
-               private final Map<String, TaskMetricStore> tasks = new 
HashMap<>();
+       @ThreadSafe
+       private static class JobMetricStore extends ComponentMetricStore {
+               private final Map<String, TaskMetricStore> tasks = new 
ConcurrentHashMap<>();
 
                public TaskMetricStore getTaskMetricStore(String taskID) {
-                       return tasks.get(taskID);
+                       return taskID == null ? null : tasks.get(taskID);
                }
        }
 
        /**
         * Sub-structure containing metrics of a single Task.
         */
-       public static class TaskMetricStore extends ComponentMetricStore {
-               private final Map<Integer, SubtaskMetricStore> subtasks = new 
HashMap<>();
+       @ThreadSafe
+       private static class TaskMetricStore extends ComponentMetricStore {
+               private final Map<Integer, ComponentMetricStore> subtasks = new 
ConcurrentHashMap<>();
 
-               public SubtaskMetricStore getSubtaskMetricStore(int 
subtaskIndex) {
+               public ComponentMetricStore getSubtaskMetricStore(int 
subtaskIndex) {
                        return subtasks.get(subtaskIndex);
                }
        }
-
-       /**
-        * Sub-structure containing metrics of a single Subtask.
-        */
-       public static class SubtaskMetricStore extends ComponentMetricStore {
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
index 90bafb7..f0a83b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
@@ -49,7 +49,7 @@ public class TaskManagerMetricsHandler extends 
AbstractMetricsHandler {
 
        @Override
        protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
-               MetricStore.TaskManagerMetricStore taskManager = 
metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY));
+               MetricStore.ComponentMetricStore taskManager = 
metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY));
                if (taskManager == null) {
                        return null;
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index 2f5a7c8..ee40b99 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -77,58 +77,56 @@ public class MutableIOMetrics extends IOMetrics {
                } else { // execAttempt is still running, use 
MetricQueryService instead
                        if (fetcher != null) {
                                fetcher.update();
-                               MetricStore metricStore = 
fetcher.getMetricStore();
-                               synchronized (metricStore) {
-                                       MetricStore.SubtaskMetricStore metrics 
= metricStore.getSubtaskMetricStore(jobID, taskID, 
attempt.getParallelSubtaskIndex());
-                                       if (metrics != null) {
-                                               /**
-                                                * We want to keep track of 
missing metrics to be able to make a difference between 0 as a value
-                                                * and a missing value.
-                                                * In case a metric is missing 
for a parallel instance of a task, we set the complete flag as
-                                                * false.
-                                                */
-                                               if 
(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL) == null){
-                                                       
this.numBytesInLocalComplete = false;
-                                               }
-                                               else {
-                                                       this.numBytesInLocal += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL));
-                                               }
-
-                                               if 
(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE) == null){
-                                                       
this.numBytesInRemoteComplete = false;
-                                               }
-                                               else {
-                                                       this.numBytesInRemote 
+= Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE));
-                                               }
-
-                                               if 
(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT) == null){
-                                                       
this.numBytesOutComplete = false;
-                                               }
-                                               else {
-                                                       this.numBytesOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT));
-                                               }
-
-                                               if 
(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN) == null){
-                                                       
this.numRecordsInComplete = false;
-                                               }
-                                               else {
-                                                       this.numRecordsIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN));
-                                               }
-
-                                               if 
(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT) == null){
-                                                       
this.numRecordsOutComplete = false;
-                                               }
-                                               else {
-                                                       this.numRecordsOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT));
-                                               }
+                               MetricStore.ComponentMetricStore metrics = 
fetcher.getMetricStore()
+                                       .getSubtaskMetricStore(jobID, taskID, 
attempt.getParallelSubtaskIndex());
+                               if (metrics != null) {
+                                       /**
+                                        * We want to keep track of missing 
metrics to be able to make a difference between 0 as a value
+                                        * and a missing value.
+                                        * In case a metric is missing for a 
parallel instance of a task, we set the complete flag as
+                                        * false.
+                                        */
+                                       if 
(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL) == null){
+                                               this.numBytesInLocalComplete = 
false;
                                        }
                                        else {
-                                               this.numBytesInLocalComplete = 
false;
+                                               this.numBytesInLocal += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL));
+                                       }
+
+                                       if 
(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE) == null){
                                                this.numBytesInRemoteComplete = 
false;
+                                       }
+                                       else {
+                                               this.numBytesInRemote += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE));
+                                       }
+
+                                       if 
(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT) == null){
                                                this.numBytesOutComplete = 
false;
+                                       }
+                                       else {
+                                               this.numBytesOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT));
+                                       }
+
+                                       if 
(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN) == null){
                                                this.numRecordsInComplete = 
false;
+                                       }
+                                       else {
+                                               this.numRecordsIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN));
+                                       }
+
+                                       if 
(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT) == null){
                                                this.numRecordsOutComplete = 
false;
                                        }
+                                       else {
+                                               this.numRecordsOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT));
+                                       }
+                               }
+                               else {
+                                       this.numBytesInLocalComplete = false;
+                                       this.numBytesInRemoteComplete = false;
+                                       this.numBytesOutComplete = false;
+                                       this.numRecordsInComplete = false;
+                                       this.numRecordsOutComplete = false;
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index e513dd9..a6eaf2f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -126,17 +126,17 @@ public class MetricFetcherTest extends TestLogger {
                fetcher.update();
                MetricStore store = fetcher.getMetricStore();
                synchronized (store) {
-                       assertEquals("7", 
store.jobManager.metrics.get("abc.hist_min"));
-                       assertEquals("6", 
store.jobManager.metrics.get("abc.hist_max"));
-                       assertEquals("4.0", 
store.jobManager.metrics.get("abc.hist_mean"));
-                       assertEquals("0.5", 
store.jobManager.metrics.get("abc.hist_median"));
-                       assertEquals("5.0", 
store.jobManager.metrics.get("abc.hist_stddev"));
-                       assertEquals("0.75", 
store.jobManager.metrics.get("abc.hist_p75"));
-                       assertEquals("0.9", 
store.jobManager.metrics.get("abc.hist_p90"));
-                       assertEquals("0.95", 
store.jobManager.metrics.get("abc.hist_p95"));
-                       assertEquals("0.98", 
store.jobManager.metrics.get("abc.hist_p98"));
-                       assertEquals("0.99", 
store.jobManager.metrics.get("abc.hist_p99"));
-                       assertEquals("0.999", 
store.jobManager.metrics.get("abc.hist_p999"));
+                       assertEquals("7", 
store.getJobManagerMetricStore().getMetric("abc.hist_min"));
+                       assertEquals("6", 
store.getJobManagerMetricStore().getMetric("abc.hist_max"));
+                       assertEquals("4.0", 
store.getJobManagerMetricStore().getMetric("abc.hist_mean"));
+                       assertEquals("0.5", 
store.getJobManagerMetricStore().getMetric("abc.hist_median"));
+                       assertEquals("5.0", 
store.getJobManagerMetricStore().getMetric("abc.hist_stddev"));
+                       assertEquals("0.75", 
store.getJobManagerMetricStore().getMetric("abc.hist_p75"));
+                       assertEquals("0.9", 
store.getJobManagerMetricStore().getMetric("abc.hist_p90"));
+                       assertEquals("0.95", 
store.getJobManagerMetricStore().getMetric("abc.hist_p95"));
+                       assertEquals("0.98", 
store.getJobManagerMetricStore().getMetric("abc.hist_p98"));
+                       assertEquals("0.99", 
store.getJobManagerMetricStore().getMetric("abc.hist_p99"));
+                       assertEquals("0.999", 
store.getJobManagerMetricStore().getMetric("abc.hist_p999"));
 
                        assertEquals("x", 
store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge"));
                        assertEquals("5.0", 
store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc"));
@@ -157,8 +157,8 @@ public class MetricFetcherTest extends TestLogger {
                c1.inc(1);
                c2.inc(2);
 
-               counters.put(c1, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", 
"abc"), "oc"));
-               counters.put(c2, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc"));
+               counters.put(c1, new Tuple2<>(new 
QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", 
"abc"), "oc"));
+               counters.put(c2, new Tuple2<>(new 
QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc"));
                meters.put(new Meter() {
                        @Override
                        public void markEvent() {
@@ -177,14 +177,14 @@ public class MetricFetcherTest extends TestLogger {
                        public long getCount() {
                                return 10;
                        }
-               }, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc"));
+               }, new Tuple2<>(new 
QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc"));
                gauges.put(new Gauge<String>() {
                        @Override
                        public String getValue() {
                                return "x";
                        }
-               }, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge"));
-               histograms.put(new TestingHistogram(), new 
Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist"));
+               }, new Tuple2<>(new 
QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge"));
+               histograms.put(new TestingHistogram(), new Tuple2<>(new 
QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist"));
 
                MetricDumpSerialization.MetricDumpSerializer serializer = new 
MetricDumpSerialization.MetricDumpSerializer();
                MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);

http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index 31225ad..82c6894 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -58,9 +58,9 @@ public class MetricStoreTest extends TestLogger {
                store.add(cd);
 
                //-----verify that no side effects occur
-               assertEquals(0, store.jobManager.metrics.size());
-               assertEquals(0, store.taskManagers.size());
-               assertEquals(0, store.jobs.size());
+               assertEquals(0, store.getJobManager().metrics.size());
+               assertEquals(0, store.getTaskManagers().size());
+               assertEquals(0, store.getJobs().size());
        }
 
        public static MetricStore setupStore(MetricStore store) {

Reply via email to