Repository: flink
Updated Branches:
  refs/heads/master a7e0a277f -> 7fb7e0b97


http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index fc0ce5c..b6be31c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
@@ -39,7 +39,7 @@ public class TaskManagerJobGroupTest extends TestLogger {
 
        @Test
        public void testGenerateScopeDefault() {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
                JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
@@ -59,7 +59,7 @@ public class TaskManagerJobGroupTest extends TestLogger {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
                cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, 
"some-constant.<job_name>");
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                JobID jid = new JobID();
 
@@ -81,7 +81,7 @@ public class TaskManagerJobGroupTest extends TestLogger {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_TM, "peter.<tm_id>");
                cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, 
"*.some-constant.<job_id>");
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                JobID jid = new JobID();
 
@@ -101,7 +101,7 @@ public class TaskManagerJobGroupTest extends TestLogger {
        @Test
        public void testCreateQueryServiceMetricInfo() {
                JobID jid = new JobID();
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
                TaskManagerJobMetricGroup job = new 
TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index 43cbbf1..be7407e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
@@ -49,7 +49,7 @@ public class TaskMetricGroupTest extends TestLogger {
 
        @Test
        public void testGenerateScopeDefault() {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                JobVertexID vertexId = new JobVertexID();
                AbstractID executionId = new AbstractID();
 
@@ -73,7 +73,7 @@ public class TaskMetricGroupTest extends TestLogger {
                cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
                cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "def");
                cfg.setString(MetricOptions.SCOPE_NAMING_TASK, 
"<tm_id>.<job_id>.<task_id>.<task_attempt_id>");
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                JobID jid = new JobID();
                JobVertexID vertexId = new JobVertexID();
@@ -98,7 +98,7 @@ public class TaskMetricGroupTest extends TestLogger {
        public void testGenerateScopeWilcard() {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_TASK, 
"*.<task_attempt_id>.<subtask_index>");
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                AbstractID executionId = new AbstractID();
 
@@ -123,7 +123,7 @@ public class TaskMetricGroupTest extends TestLogger {
                JobID jid = new JobID();
                JobVertexID vid = new JobVertexID();
                AbstractID eid = new AbstractID();
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
                TaskManagerJobMetricGroup job = new 
TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
                TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, 
eid, "taskName", 4, 5);
@@ -157,7 +157,7 @@ public class TaskMetricGroupTest extends TestLogger {
        public void testOperatorNameTruncation() {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, 
ScopeFormat.SCOPE_OPERATOR_NAME);
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
                TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
                TaskManagerJobMetricGroup job = new 
TaskManagerJobMetricGroup(registry, tm, new JobID(), "jobname");
                TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, 
job, new JobVertexID(), new AbstractID(), "task", 0, 0);
@@ -170,7 +170,7 @@ public class TaskMetricGroupTest extends TestLogger {
                Assert.assertEquals(originalName.substring(0, 
TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH), storedName);
        }
 
-       private static class CountingMetricRegistry extends MetricRegistry {
+       private static class CountingMetricRegistry extends MetricRegistryImpl {
 
                private int counter = 0;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index 2ca19c1..3d29815 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -37,7 +37,7 @@ import java.util.UUID;
 
 public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
        
-       private static final MetricRegistry EMPTY_REGISTRY = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+       private static final MetricRegistryImpl EMPTY_REGISTRY = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
        
        public UnregisteredTaskMetricsGroup() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index d0dd973..306d4d4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -78,7 +78,7 @@ public class ResourceManagerHATest extends TestLogger {
                        highAvailabilityServices,
                        rpcService.getScheduledExecutor());
 
-               MetricRegistry metricRegistry = mock(MetricRegistry.class);
+               MetricRegistryImpl metricRegistry = 
mock(MetricRegistryImpl.class);
 
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 73c5b5c..1b6324c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -270,7 +270,7 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
                        Time.seconds(5L),
                        Time.seconds(5L));
-               MetricRegistry metricRegistry = mock(MetricRegistry.class);
+               MetricRegistryImpl metricRegistry = 
mock(MetricRegistryImpl.class);
                JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
                        highAvailabilityServices,
                        rpcService.getScheduledExecutor(),

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 0206ade..147d180 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -182,7 +182,7 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime());
 
-               MetricRegistry metricRegistry = mock(MetricRegistry.class);
+               MetricRegistryImpl metricRegistry = 
mock(MetricRegistryImpl.class);
                JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
                        highAvailabilityServices,
                        rpcService.getScheduledExecutor(),

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
index 8e76674..b600cbe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
@@ -105,7 +105,7 @@ public class TaskManagerLogHandlerTest {
                JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
                
when(jobManagerGateway.requestBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(1337));
                when(jobManagerGateway.getHostname()).thenReturn("localhost");
-               
when(jobManagerGateway.requestTaskManagerInstance(any(InstanceID.class), 
any(Time.class))).thenReturn(
+               
when(jobManagerGateway.requestTaskManagerInstance(any(ResourceID.class), 
any(Time.class))).thenReturn(
                        
CompletableFuture.completedFuture(Optional.of(taskManager)));
 
                GatewayRetriever<JobManagerGateway> retriever = 
mock(GatewayRetriever.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index ce98f31..7b10db6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -89,7 +89,7 @@ public class MetricFetcherTest extends TestLogger {
                
when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn(
                        
CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)));
                
when(jobManagerGateway.requestTaskManagerMetricQueryServicePaths(any(Time.class))).thenReturn(
-                       
CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmID, 
tmMetricQueryServicePath))));
+                       
CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmRID, 
tmMetricQueryServicePath))));
 
                GatewayRetriever<JobManagerGateway> retriever = 
mock(AkkaJobManagerRetriever.class);
                when(retriever.getNow())
@@ -99,7 +99,7 @@ public class MetricFetcherTest extends TestLogger {
                MetricQueryServiceGateway jmQueryService = 
mock(MetricQueryServiceGateway.class);
                MetricQueryServiceGateway tmQueryService = 
mock(MetricQueryServiceGateway.class);
 
-               MetricDumpSerialization.MetricSerializationResult 
requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID);
+               MetricDumpSerialization.MetricSerializationResult 
requestMetricsAnswer = createRequestDumpAnswer(tmRID, jobID);
 
                when(jmQueryService.queryMetrics(any(Time.class)))
                        .thenReturn(CompletableFuture.completedFuture(new 
MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
@@ -133,14 +133,14 @@ public class MetricFetcherTest extends TestLogger {
                        assertEquals("0.99", 
store.getJobManagerMetricStore().getMetric("abc.hist_p99"));
                        assertEquals("0.999", 
store.getJobManagerMetricStore().getMetric("abc.hist_p999"));
 
-                       assertEquals("x", 
store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge"));
+                       assertEquals("x", 
store.getTaskManagerMetricStore(tmRID.toString()).metrics.get("abc.gauge"));
                        assertEquals("5.0", 
store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc"));
                        assertEquals("2", 
store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.abc.tc"));
                        assertEquals("1", 
store.getTaskMetricStore(jobID.toString(), 
"taskid").metrics.get("2.opname.abc.oc"));
                }
        }
 
-       private static MetricDumpSerialization.MetricSerializationResult 
createRequestDumpAnswer(InstanceID tmID, JobID jobID) {
+       private static MetricDumpSerialization.MetricSerializationResult 
createRequestDumpAnswer(ResourceID tmRID, JobID jobID) {
                Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new 
HashMap<>();
                Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new 
HashMap<>();
                Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new 
HashMap<>();
@@ -178,7 +178,7 @@ public class MetricFetcherTest extends TestLogger {
                        public String getValue() {
                                return "x";
                        }
-               }, new Tuple2<>(new 
QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge"));
+               }, new Tuple2<>(new 
QueryScopeInfo.TaskManagerQueryScopeInfo(tmRID.toString(), "abc"), "gauge"));
                histograms.put(new TestingHistogram(), new Tuple2<>(new 
QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist"));
 
                MetricDumpSerialization.MetricDumpSerializer serializer = new 
MetricDumpSerialization.MetricDumpSerializer();

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index e448ccc..7c6b7dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -39,7 +39,7 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
@@ -112,7 +112,7 @@ public class TaskExecutorITCase extends TestLogger {
                        testingHAServices,
                        rpcService.getScheduledExecutor(),
                        Time.minutes(5L));
-               MetricRegistry metricRegistry = mock(MetricRegistry.class);
+               MetricRegistryImpl metricRegistry = 
mock(MetricRegistryImpl.class);
                HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class, RETURNS_MOCKS);
 
                final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index e106238..fcd6e4e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -61,7 +61,7 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -205,7 +205,7 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        heartbeatServices,
-                       mock(MetricRegistry.class),
+                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -311,7 +311,7 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        heartbeatServices,
-                       mock(MetricRegistry.class),
+                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -429,7 +429,7 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        heartbeatServices,
-                       mock(MetricRegistry.class),
+                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -522,7 +522,7 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistry.class),
+                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -605,7 +605,7 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistry.class),
+                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -747,7 +747,7 @@ public class TaskExecutorTest extends TestLogger {
                        networkEnvironment,
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistry.class),
+                       mock(MetricRegistryImpl.class),
                        taskManagerMetricGroup,
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -863,7 +863,7 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistry.class),
+                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -981,7 +981,7 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistry.class),
+                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -1074,7 +1074,7 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistry.class),
+                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -1248,7 +1248,7 @@ public class TaskExecutorTest extends TestLogger {
                        networkMock,
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistry.class),
+                       mock(MetricRegistryImpl.class),
                        taskManagerMetricGroup,
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -1371,7 +1371,7 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServicesMock,
                        heartbeatServicesMock,
-                       mock(MetricRegistry.class),
+                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
index bf90634..a8358a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
@@ -259,7 +259,6 @@ public class TaskManagerServicesTest {
                        managedMemory,
                        false,
                        managedMemoryFraction,
-                       mock(MetricRegistryConfiguration.class),
                        0);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 10b74c3..8249fca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -49,8 +49,9 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -62,7 +63,6 @@ import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.InetAddress;
-import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
@@ -99,6 +99,7 @@ public class TaskManagerComponentsStartupShutdownTest extends 
TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                Option.empty(),
                                JobManager.class,
                                MemoryArchivist.class)._1();
@@ -168,7 +169,7 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
                                network,
                                numberOfSlots,
                                highAvailabilityServices,
-                               new 
MetricRegistry(metricRegistryConfiguration));
+                               new 
MetricRegistryImpl(metricRegistryConfiguration));
 
                        taskManager = actorSystem.actorOf(tmProps);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 22d49d2..7429ec5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -18,12 +18,9 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
@@ -32,17 +29,16 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
-import org.junit.Test;
 
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.duration.FiniteDuration;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
@@ -50,6 +46,11 @@ import java.io.InputStream;
 import java.io.StringWriter;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
@@ -121,6 +122,7 @@ public abstract class TaskManagerProcessReapingTestBase 
extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                Option.empty(),
                                JobManager.class,
                                MemoryArchivist.class)._1;

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 67accdb..2e6c580 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.StartupUtils;
 import org.apache.flink.util.NetUtils;
@@ -249,6 +250,7 @@ public class TaskManagerStartupTest extends TestLogger {
                                ResourceID.generate(),
                                null,
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                "localhost",
                                Option.<String>empty(),
                                false,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index f575867..887c4f5 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.instance._
 import 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import 
org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration,
 AlreadyRegistered, RegisterTaskManager}
+import org.apache.flink.runtime.metrics.{MetricRegistryImpl, 
MetricRegistryConfiguration}
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenLeader
 import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingUtils}
@@ -60,6 +61,10 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with Befor
 
   var highAvailabilityServices: HighAvailabilityServices = _
 
+  val metricRegistry: MetricRegistryImpl = new MetricRegistryImpl(
+    MetricRegistryConfiguration.fromConfiguration(new Configuration())
+  )
+
   val timeout = FiniteDuration(30, TimeUnit.SECONDS)
   
   override def afterAll(): Unit = {
@@ -87,7 +92,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with Befor
       var tm2Option: Option[ActorRef] = None
 
       try {
-        val jm = startTestingJobManager(_system, highAvailabilityServices)
+        val jm = startTestingJobManager(_system, highAvailabilityServices, 
metricRegistry)
         jmOption = Some(jm)
 
         val rm = startTestingResourceManager(_system, highAvailabilityServices)
@@ -169,7 +174,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with Befor
       try {
         val probe = TestProbe()
 
-        val jm = startTestingJobManager(_system, highAvailabilityServices)
+        val jm = startTestingJobManager(_system, highAvailabilityServices, 
metricRegistry)
         jmOption = Some(jm)
         val rm = startTestingResourceManager(_system, highAvailabilityServices)
         rmOption = Some(rm)
@@ -242,7 +247,8 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with Befor
 
   private def startTestingJobManager(
       system: ActorSystem,
-      highAvailabilityServices: HighAvailabilityServices): ActorGateway = {
+      highAvailabilityServices: HighAvailabilityServices,
+      metricRegistry: MetricRegistryImpl): ActorGateway = {
 
     val config = new Configuration()
     
@@ -250,7 +256,8 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with Befor
       config,
       executor,
       executor,
-      highAvailabilityServices.createBlobStore())
+      highAvailabilityServices.createBlobStore(),
+      metricRegistry)
 
     // Start the JobManager without a MetricRegistry so that we don't start 
the MetricQueryService.
     // The problem of the MetricQueryService is that it starts an actor with a 
fixed name. Thus,
@@ -273,7 +280,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with Befor
       highAvailabilityServices.getSubmittedJobGraphStore(),
       highAvailabilityServices.getCheckpointRecoveryFactory(),
       components._9,
-      None,
+      components._10,
       None)
 
     _system.actorOf(props)

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 2e884a0..2b91cd4 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -42,7 +42,8 @@ import org.apache.flink.runtime.jobmanager.{JobManager, 
MemoryArchivist, Submitt
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.messages.JobManagerMessages
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.MetricRegistryImpl
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
@@ -120,7 +121,7 @@ class TestingCluster(
     submittedJobGraphStore: SubmittedJobGraphStore,
     checkpointRecoveryFactory: CheckpointRecoveryFactory,
     jobRecoveryTimeout: FiniteDuration,
-    metricsRegistry: Option[MetricRegistry],
+    jobManagerMetricGroup: JobManagerMetricGroup,
     optRestAddress: Option[String]): Props = {
 
     val props = super.getJobManagerProps(
@@ -139,7 +140,7 @@ class TestingCluster(
       submittedJobGraphStore,
       checkpointRecoveryFactory,
       jobRecoveryTimeout,
-      metricsRegistry,
+      jobManagerMetricGroup,
       optRestAddress)
 
     if (synchronousDispatcher) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 2170a8c..13e2b75 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -53,7 +53,7 @@ class TestingJobManager(
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
     jobRecoveryTimeout : FiniteDuration,
-    metricRegistry : Option[MetricRegistry],
+    jobManagerMetricGroup : JobManagerMetricGroup,
     optRestAddress: Option[String])
   extends JobManager(
     flinkConfiguration,
@@ -70,6 +70,6 @@ class TestingJobManager(
     submittedJobGraphs,
     checkpointRecoveryFactory,
     jobRecoveryTimeout,
-    metricRegistry,
+    jobManagerMetricGroup,
     optRestAddress)
   with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 1db0a85..da753ae 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
@@ -40,7 +40,7 @@ class TestingTaskManager(
     network: NetworkEnvironment,
     numberOfSlots: Int,
     highAvailabilityServices: HighAvailabilityServices,
-    metricRegistry : MetricRegistry)
+    taskManagerMetricGroup : TaskManagerMetricGroup)
   extends TaskManager(
     config,
     resourceID,
@@ -50,7 +50,7 @@ class TestingTaskManager(
     network,
     numberOfSlots,
     highAvailabilityServices,
-    metricRegistry)
+    taskManagerMetricGroup)
   with TestingTaskManagerLike {
 
   def this(
@@ -61,7 +61,7 @@ class TestingTaskManager(
     network: NetworkEnvironment,
     numberOfSlots: Int,
     highAvailabilityServices: HighAvailabilityServices,
-    metricRegistry : MetricRegistry) {
+    taskManagerMetricGroup : TaskManagerMetricGroup) {
     this(
       config,
       ResourceID.generate(),
@@ -71,6 +71,6 @@ class TestingTaskManager(
       network,
       numberOfSlots,
       highAvailabilityServices,
-      metricRegistry)
+      taskManagerMetricGroup)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index f78af9f..2de6f9e 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.testingUtils
 
+import java.net.InetAddress
 import java.util
-import java.util.{Collections, UUID}
 import java.util.concurrent._
+import java.util.{Collections, UUID}
 
 import akka.actor.{ActorRef, ActorSystem, Kill, Props}
 import akka.pattern.{Patterns, ask}
@@ -38,13 +39,15 @@ import org.apache.flink.runtime.jobmanager.{JobManager, 
MemoryArchivist}
 import org.apache.flink.runtime.jobmaster.JobMaster
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager,
 RegisteredAtJobManager}
+import org.apache.flink.runtime.metrics.{MetricRegistryImpl, 
MetricRegistryConfiguration}
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
+import org.apache.flink.runtime.taskexecutor.{TaskManagerServices, 
TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
 
-import scala.concurrent.duration.TimeUnit
-import scala.concurrent.duration._
+import scala.concurrent.duration.{TimeUnit, _}
 import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor}
 import scala.language.postfixOps
 
@@ -266,11 +269,17 @@ object TestingUtils {
 
     resultingConfiguration.addAll(configuration)
 
+    val metricRegistry = new MetricRegistryImpl(
+      MetricRegistryConfiguration.fromConfiguration(configuration))
+
+    val taskManagerResourceId = ResourceID.generate()
+
     val taskManager = TaskManager.startTaskManagerComponentsAndActor(
       resultingConfiguration,
-      ResourceID.generate(),
+      taskManagerResourceId,
       actorSystem,
       highAvailabilityServices,
+      metricRegistry,
       "localhost",
       None,
       useLocalCommunication,
@@ -471,12 +480,16 @@ object TestingUtils {
       HighAvailabilityOptions.HA_MODE,
       ConfigConstants.DEFAULT_HA_MODE)
 
+    val metricRegistry = new MetricRegistryImpl(
+      MetricRegistryConfiguration.fromConfiguration(configuration))
+
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
         actorSystem,
         futureExecutor,
         ioExecutor,
         highAvailabilityServices,
+        metricRegistry,
         None,
         Some(prefix + JobMaster.JOB_MANAGER_NAME),
         Some(prefix + JobMaster.ARCHIVE_NAME),

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index a78c528..94aed2a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -145,6 +146,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                Option.empty(),
                                JobManager.class,
                                MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 51868af..7c53d52 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -217,6 +218,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                                ResourceID.generate(),
                                taskManagerSystem,
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                "localhost",
                                Option.<String>empty(),
                                false,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index b575dca..ee37d6d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -203,6 +204,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends 
TestLogger {
                                ResourceID.generate(),
                                taskManagerSystem,
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                "localhost",
                                Option.<String>empty(),
                                false,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 2820dd2..8e97e9d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -281,6 +282,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
                                        ResourceID.generate(),
                                        tmActorSystem[i],
                                        highAvailabilityServices,
+                                       new NoOpMetricRegistry(),
                                        "localhost",
                                        Option.<String>empty(),
                                        false,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index c70c2d5..ecd0bea 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
@@ -120,6 +121,7 @@ public class ProcessFailureCancelingITCase extends 
TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                Option.empty(),
                                JobManager.class,
                                MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index e5131af..7488b62 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
@@ -112,6 +113,7 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger {
                        TestingUtils.defaultExecutor(),
                        TestingUtils.defaultExecutor(),
                        highAvailabilityServices,
+                       new NoOpMetricRegistry(),
                        Option.empty(),
                        Option.apply("jm"),
                        Option.apply("arch"),
@@ -133,6 +135,7 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger {
                        ResourceID.generate(),
                        actorSystem,
                        highAvailabilityServices,
+                       new NoOpMetricRegistry(),
                        "localhost",
                        Option.apply("tm"),
                        true,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 298cd52..d0084b6 100644
--- 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
 
 import scala.concurrent.duration.FiniteDuration
@@ -68,7 +68,7 @@ class TestingYarnJobManager(
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
     jobRecoveryTimeout: FiniteDuration,
-    metricRegistry : Option[MetricRegistry],
+    jobManagerMetricGroup : JobManagerMetricGroup,
     optRestAddress: Option[String])
   extends YarnJobManager(
     flinkConfiguration,
@@ -85,6 +85,6 @@ class TestingYarnJobManager(
     submittedJobGraphs,
     checkpointRecoveryFactory,
     jobRecoveryTimeout,
-    metricRegistry,
+    jobManagerMetricGroup,
     optRestAddress)
   with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index a03f365..228eaaa 100644
--- 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
@@ -52,7 +52,7 @@ class TestingYarnTaskManager(
     network: NetworkEnvironment,
     numberOfSlots: Int,
     highAvailabilityServices: HighAvailabilityServices,
-    metricRegistry : MetricRegistry)
+    taskManagerMetricGroup : TaskManagerMetricGroup)
   extends YarnTaskManager(
     config,
     resourceID,
@@ -62,7 +62,7 @@ class TestingYarnTaskManager(
     network,
     numberOfSlots,
     highAvailabilityServices,
-    metricRegistry)
+    taskManagerMetricGroup)
   with TestingTaskManagerLike {
 
   object YarnTaskManager {

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index c101b75..95ba154 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -36,6 +36,8 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -219,6 +221,7 @@ public class YarnApplicationMasterRunner {
                ActorSystem actorSystem = null;
                WebMonitor webMonitor = null;
                HighAvailabilityServices highAvailabilityServices = null;
+               MetricRegistryImpl metricRegistry = null;
 
                int numberProcessors = Hardware.getNumberCPUCores();
 
@@ -357,6 +360,11 @@ public class YarnApplicationMasterRunner {
                                new 
ScheduledExecutorServiceAdapter(futureExecutor),
                                LOG);
 
+                       metricRegistry = new MetricRegistryImpl(
+                               
MetricRegistryConfiguration.fromConfiguration(config));
+
+                       metricRegistry.startQueryService(actorSystem, null);
+
                        // 2: the JobManager
                        LOG.debug("Starting JobManager actor");
 
@@ -367,6 +375,7 @@ public class YarnApplicationMasterRunner {
                                futureExecutor,
                                ioExecutor,
                                highAvailabilityServices,
+                               metricRegistry,
                                webMonitor == null ? Option.empty() : 
Option.apply(webMonitor.getRestAddress()),
                                new Some<>(JobMaster.JOB_MANAGER_NAME),
                                Option.<String>empty(),
@@ -455,6 +464,10 @@ public class YarnApplicationMasterRunner {
                        }
                }
 
+               if (metricRegistry != null) {
+                       metricRegistry.shutdown();
+               }
+
                org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
                        AkkaUtils.getTimeout(config).toMillis(),
                        TimeUnit.MILLISECONDS,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index b32d25c..6feb287 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -116,7 +116,7 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler) {
                super(

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index e1efb54..439fdf3 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -78,7 +78,7 @@ public class YarnJobClusterEntrypoint extends 
JobClusterEntrypoint {
                        RpcService rpcService,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
                final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 042644b..a13f62c 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -68,7 +68,7 @@ public class YarnSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
                        RpcService rpcService,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
                final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 5909160..6b439bd 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -34,7 +34,8 @@ import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.MetricRegistryImpl
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.yarn.configuration.YarnConfigOptions
 
 import scala.concurrent.duration._
@@ -72,7 +73,7 @@ class YarnJobManager(
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
     jobRecoveryTimeout: FiniteDuration,
-    metricsRegistry: Option[MetricRegistry],
+    jobManagerMetricGroup: JobManagerMetricGroup,
     optRestAddress: Option[String])
   extends ContaineredJobManager(
     flinkConfiguration,
@@ -89,7 +90,7 @@ class YarnJobManager(
     submittedJobGraphs,
     checkpointRecoveryFactory,
     jobRecoveryTimeout,
-    metricsRegistry,
+    jobManagerMetricGroup,
     optRestAddress) {
 
   val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index e37ff6f..615466d 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
@@ -39,7 +39,7 @@ class YarnTaskManager(
     network: NetworkEnvironment,
     numberOfSlots: Int,
     highAvailabilityServices: HighAvailabilityServices,
-    metricRegistry : MetricRegistry)
+    taskManagerMetricGroup: TaskManagerMetricGroup)
   extends TaskManager(
     config,
     resourceID,
@@ -49,7 +49,7 @@ class YarnTaskManager(
     network,
     numberOfSlots,
     highAvailabilityServices,
-    metricRegistry) {
+    taskManagerMetricGroup) {
 
   override def handleMessage: Receive = {
     super.handleMessage

Reply via email to