http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index e4ceb40..72c03af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -18,15 +18,6 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -42,25 +33,37 @@ import 
org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+
+import java.util.concurrent.TimeUnit;
+
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.concurrent.TimeUnit;
-
 public class JobManagerLeaderElectionTest extends TestLogger {
 
        @Rule
@@ -198,7 +201,7 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                        submittedJobGraphStore,
                        checkpointRecoveryFactory,
                        AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
-                       Option.<MetricRegistryImpl>empty(),
+                       new JobManagerMetricGroup(new NoOpMetricRegistry(), 
"localhost"),
                        Option.<String>empty());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
index 1140e3d..46d6548 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
@@ -23,6 +23,8 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 
+import javax.annotation.Nullable;
+
 /**
  * Metric registry which does nothing and is intended for testing purposes.
  */
@@ -57,4 +59,10 @@ public class NoOpMetricRegistry implements MetricRegistry {
        public ScopeFormats getScopeFormats() {
                return scopeFormats;
        }
+
+       @Nullable
+       @Override
+       public String getMetricQueryServicePath() {
+               return null;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index 31304e5..d934ea9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServic
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
@@ -94,8 +96,12 @@ public class TaskManagerMetricsTest extends TestLogger {
 
                        TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(
                                taskManagerServicesConfiguration,
-                               tmResourceID,
-                               metricRegistry);
+                               tmResourceID);
+
+                       TaskManagerMetricGroup taskManagerMetricGroup = 
MetricUtils.instantiateTaskManagerMetricGroup(
+                               metricRegistry,
+                               taskManagerServices.getTaskManagerLocation(),
+                               taskManagerServices.getNetworkEnvironment());
 
                        // create the task manager
                        final Props tmProps = TaskManager.getTaskManagerProps(
@@ -107,7 +113,7 @@ public class TaskManagerMetricsTest extends TestLogger {
                                taskManagerServices.getIOManager(),
                                taskManagerServices.getNetworkEnvironment(),
                                highAvailabilityServices,
-                               
taskManagerServices.getTaskManagerMetricGroup());
+                               taskManagerMetricGroup);
 
                        final ActorRef taskManager = 
actorSystem.actorOf(tmProps);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index 3d29815..7065e6b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -22,22 +22,22 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import java.util.UUID;
 
 public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
        
-       private static final MetricRegistryImpl EMPTY_REGISTRY = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+       private static final MetricRegistry EMPTY_REGISTRY = new 
NoOpMetricRegistry();
 
        
        public UnregisteredTaskMetricsGroup() {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index 7b10db6..faa97a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -72,8 +71,7 @@ public class MetricFetcherTest extends TestLogger {
 
                // ========= setup TaskManager 
=================================================================================
                JobID jobID = new JobID();
-               InstanceID tmID = new InstanceID();
-               ResourceID tmRID = new ResourceID(tmID.toString());
+               ResourceID tmRID = ResourceID.generate();
 
                // ========= setup JobManager 
==================================================================================
                JobDetails details = mock(JobDetails.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 7c6b7dd..1f1d09d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -153,7 +153,6 @@ public class TaskExecutorITCase extends TestLogger {
                        networkEnvironment,
                        testingHAServices,
                        heartbeatServices,
-                       metricRegistry,
                        taskManagerMetricGroup,
                        broadcastVariableManager,
                        fileCache,

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index fcd6e4e..de807a6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -45,8 +45,8 @@ import 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -61,7 +61,6 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -114,7 +113,16 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.RETURNS_MOCKS;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TaskExecutorTest extends TestLogger {
 
@@ -205,7 +213,6 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        heartbeatServices,
-                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -311,7 +318,6 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        heartbeatServices,
-                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -429,7 +435,6 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        heartbeatServices,
-                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -522,7 +527,6 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -605,7 +609,6 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -747,7 +750,6 @@ public class TaskExecutorTest extends TestLogger {
                        networkEnvironment,
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistryImpl.class),
                        taskManagerMetricGroup,
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -863,7 +865,6 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -981,7 +982,6 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -1074,7 +1074,6 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -1248,7 +1247,6 @@ public class TaskExecutorTest extends TestLogger {
                        networkMock,
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(MetricRegistryImpl.class),
                        taskManagerMetricGroup,
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -1371,7 +1369,6 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServicesMock,
                        heartbeatServicesMock,
-                       mock(MetricRegistryImpl.class),
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 8249fca..4b62770 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import static org.junit.Assert.*;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
@@ -49,21 +41,28 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
 import org.junit.Test;
 
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
 import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.net.InetAddress;
-import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertTrue;
 
 public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 
@@ -169,7 +168,7 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
                                network,
                                numberOfSlots,
                                highAvailabilityServices,
-                               new 
MetricRegistryImpl(metricRegistryConfiguration));
+                               new TaskManagerMetricGroup(new 
NoOpMetricRegistry(), connectionInfo.getHostname(), 
connectionInfo.getResourceID().getResourceIdString()));
 
                        taskManager = actorSystem.actorOf(tmProps);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 2b91cd4..0369771 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.jobmanager.{JobManager, 
MemoryArchivist, Submitt
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.messages.JobManagerMessages
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.metrics.MetricRegistryImpl
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 75f0aa4..16e238b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -80,6 +80,7 @@ import 
org.apache.flink.shaded.guava18.com.google.common.collect.Multimap;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 95ba154..3bdc2ac 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -465,7 +465,11 @@ public class YarnApplicationMasterRunner {
                }
 
                if (metricRegistry != null) {
-                       metricRegistry.shutdown();
+                       try {
+                               metricRegistry.shutdown();
+                       } catch (Throwable t) {
+                               LOG.error("Could not properly shut down the 
metric registry.", t);
+                       }
                }
 
                org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6feb287..b32d25c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -116,7 +116,7 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler) {
                super(

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 439fdf3..e1efb54 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -78,7 +78,7 @@ public class YarnJobClusterEntrypoint extends 
JobClusterEntrypoint {
                        RpcService rpcService,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
                final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index a13f62c..042644b 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -68,7 +68,7 @@ public class YarnSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
                        RpcService rpcService,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
                final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

Reply via email to