http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 1a3ca70..d8e65a6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -45,7 +45,7 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -502,7 +502,7 @@ public class ResourceManagerTest extends TestLogger {
                final ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
                final HeartbeatServices heartbeatServices = new 
TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, 
scheduledExecutor);
 
-               final MetricRegistry metricRegistry = 
mock(MetricRegistry.class);
+               final MetricRegistryImpl metricRegistry = 
mock(MetricRegistryImpl.class);
                final JobLeaderIdService jobLeaderIdService = 
mock(JobLeaderIdService.class);
                final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
                final SlotManager slotManager = new SlotManager(
@@ -601,7 +601,7 @@ public class ResourceManagerTest extends TestLogger {
                final ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
                final HeartbeatServices heartbeatServices = new 
TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, 
scheduledExecutor);
 
-               final MetricRegistry metricRegistry = 
mock(MetricRegistry.class);
+               final MetricRegistryImpl metricRegistry = 
mock(MetricRegistryImpl.class);
                final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(
                        highAvailabilityServices,
                        rpcService.getScheduledExecutor(),

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d1ca757..8558145 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -114,7 +114,7 @@ public class DispatcherTest extends TestLogger {
                        mock(ResourceManagerGateway.class),
                        mock(BlobServer.class),
                        heartbeatServices,
-                       mock(MetricRegistry.class),
+                       mock(MetricRegistryImpl.class),
                        fatalErrorHandler,
                        jobManagerRunner,
                        jobId);
@@ -174,7 +174,7 @@ public class DispatcherTest extends TestLogger {
                        mock(ResourceManagerGateway.class),
                        mock(BlobServer.class),
                        heartbeatServices,
-                       mock(MetricRegistry.class),
+                       mock(MetricRegistryImpl.class),
                        fatalErrorHandler,
                        mock(JobManagerRunner.class),
                        jobId);
@@ -209,7 +209,7 @@ public class DispatcherTest extends TestLogger {
                                ResourceManagerGateway resourceManagerGateway,
                                BlobServer blobServer,
                                HeartbeatServices heartbeatServices,
-                               MetricRegistry metricRegistry,
+                               MetricRegistryImpl metricRegistry,
                                FatalErrorHandler fatalErrorHandler,
                                JobManagerRunner jobManagerRunner,
                                JobID expectedJobId) throws Exception {
@@ -238,7 +238,7 @@ public class DispatcherTest extends TestLogger {
                                HighAvailabilityServices 
highAvailabilityServices,
                                HeartbeatServices heartbeatServices,
                                JobManagerServices jobManagerServices,
-                               MetricRegistry metricRegistry,
+                               MetricRegistryImpl metricRegistry,
                                OnCompletionActions onCompleteActions,
                                FatalErrorHandler fatalErrorHandler) throws 
Exception {
                        assertEquals(expectedJobId, jobGraph.getJobID());

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 7df26fc..d843da2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -62,7 +62,9 @@ import 
org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -206,7 +208,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                mySubmittedJobGraphStore,
                                checkpointStateFactory,
                                jobRecoveryTimeout,
-                               Option.<MetricRegistry>empty(),
+                               Option.<MetricRegistryImpl>empty(),
                                Option.<String>empty());
 
                        jobManager = system.actorOf(jobManagerProps);
@@ -217,6 +219,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                ResourceID.generate(),
                                system,
                                testingHighAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                "localhost",
                                Option.apply("taskmanager"),
                                true,
@@ -380,7 +383,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                submittedJobGraphStore,
                                mock(CheckpointRecoveryFactory.class),
                                jobRecoveryTimeout,
-                               Option.<MetricRegistry>apply(null),
+                               Option.<MetricRegistryImpl>apply(null),
                                
recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
 
                        jobManager = system.actorOf(jobManagerProps);
@@ -418,7 +421,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                        SubmittedJobGraphStore submittedJobGraphs,
                        CheckpointRecoveryFactory checkpointRecoveryFactory,
                        FiniteDuration jobRecoveryTimeout,
-                       Option<MetricRegistry> metricsRegistry,
+                       JobManagerMetricGroup jobManagerMetricGroup,
                        Collection<JobID> recoveredJobs) {
                        super(
                                flinkConfiguration,
@@ -435,7 +438,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                submittedJobGraphs,
                                checkpointRecoveryFactory,
                                jobRecoveryTimeout,
-                               metricsRegistry,
+                               jobManagerMetricGroup,
                                Option.empty());
 
                        this.recoveredJobs = recoveredJobs;

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index bd7f11f..a697aae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -75,6 +75,7 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateRegistered;
@@ -624,6 +625,7 @@ public class JobManagerTest extends TestLogger {
                        TestingUtils.defaultExecutor(),
                        TestingUtils.defaultExecutor(),
                        highAvailabilityServices,
+                       new NoOpMetricRegistry(),
                        Option.empty(),
                        TestingJobManager.class,
                        MemoryArchivist.class)._1();
@@ -645,6 +647,7 @@ public class JobManagerTest extends TestLogger {
                        ResourceID.generate(),
                        system,
                        highAvailabilityServices,
+                       new NoOpMetricRegistry(),
                        "localhost",
                        scala.Option.<String>empty(),
                        true,
@@ -841,6 +844,7 @@ public class JobManagerTest extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                Option.empty(),
                                Option.apply("jm"),
                                Option.apply("arch"),
@@ -859,6 +863,7 @@ public class JobManagerTest extends TestLogger {
                                ResourceID.generate(),
                                actorSystem,
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                "localhost",
                                Option.apply("tm"),
                                true,
@@ -1051,6 +1056,7 @@ public class JobManagerTest extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                Option.empty(),
                                Option.apply("jm"),
                                Option.apply("arch"),
@@ -1069,6 +1075,7 @@ public class JobManagerTest extends TestLogger {
                                ResourceID.generate(),
                                actorSystem,
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                "localhost",
                                Option.apply("tm"),
                                true,
@@ -1164,6 +1171,7 @@ public class JobManagerTest extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                Option.empty(),
                                Option.apply("jm"),
                                Option.apply("arch"),
@@ -1182,6 +1190,7 @@ public class JobManagerTest extends TestLogger {
                                ResourceID.generate(),
                                actorSystem,
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                "localhost",
                                Option.apply("tm"),
                                true,
@@ -1275,6 +1284,7 @@ public class JobManagerTest extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                Option.empty(),
                                Option.apply("jm"),
                                Option.apply("arch"),
@@ -1296,6 +1306,7 @@ public class JobManagerTest extends TestLogger {
                                ResourceID.generate(),
                                actorSystem,
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                "localhost",
                                Option.apply("tm"),
                                true,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 340a735..cc93f18 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -94,6 +95,7 @@ public class JobSubmitTest {
                        TestingUtils.defaultExecutor(),
                        TestingUtils.defaultExecutor(),
                        highAvailabilityServices,
+                       new NoOpMetricRegistry(),
                        Option.empty(),
                        JobManager.class,
                        MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index b4f50fb..083d6e9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -115,7 +115,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
                        haServices,
                        heartbeatServices,
                        JobManagerServices.fromConfiguration(new 
Configuration(), mock(BlobServer.class)),
-                       new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
+                       new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
                        jobCompletion,
                        jobCompletion));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index c3b57fa..e4ceb40 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -28,7 +28,6 @@ import akka.util.Timeout;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -43,7 +42,7 @@ import 
org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -199,7 +198,7 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                        submittedJobGraphStore,
                        checkpointRecoveryFactory,
                        AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
-                       Option.<MetricRegistry>empty(),
+                       Option.<MetricRegistryImpl>empty(),
                        Option.<String>empty());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
new file mode 100644
index 0000000..b0b20b2
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.metrics.groups.MetricGroupTest;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.runtime.metrics.util.TestReporter;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorNotFound;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link MetricRegistryImpl}.
+ */
+public class MetricRegistryImplTest extends TestLogger {
+
+       private static final char GLOBAL_DEFAULT_DELIMITER = '.';
+
+       @Test
+       public void testIsShutdown() {
+               MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+
+               Assert.assertFalse(metricRegistry.isShutdown());
+
+               metricRegistry.shutdown();
+
+               Assert.assertTrue(metricRegistry.isShutdown());
+       }
+
+       /**
+        * Verifies that the reporter class argument is correctly used to 
instantiate and open the reporter.
+        */
+       @Test
+       public void testReporterInstantiation() {
+               Configuration config = new Configuration();
+
+               config.setString(MetricOptions.REPORTERS_LIST, "test");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
+
+               MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+               assertTrue(metricRegistry.getReporters().size() == 1);
+
+               Assert.assertTrue(TestReporter1.wasOpened);
+
+               metricRegistry.shutdown();
+       }
+
+       /**
+        * Reporter that exposes whether open() was called.
+        */
+       protected static class TestReporter1 extends TestReporter {
+               public static boolean wasOpened = false;
+
+               @Override
+               public void open(MetricConfig config) {
+                       wasOpened = true;
+               }
+       }
+
+       /**
+        * Verifies that multiple reporters are instantiated correctly.
+        */
+       @Test
+       public void testMultipleReporterInstantiation() {
+               Configuration config = new Configuration();
+
+               config.setString(MetricOptions.REPORTERS_LIST, "test1, 
test2,test3");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter11.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter12.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter13.class.getName());
+
+               MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+               assertTrue(metricRegistry.getReporters().size() == 3);
+
+               Assert.assertTrue(TestReporter11.wasOpened);
+               Assert.assertTrue(TestReporter12.wasOpened);
+               Assert.assertTrue(TestReporter13.wasOpened);
+
+               metricRegistry.shutdown();
+       }
+
+       /**
+        * Reporter that exposes whether open() was called.
+        */
+       protected static class TestReporter11 extends TestReporter {
+               public static boolean wasOpened = false;
+
+               @Override
+               public void open(MetricConfig config) {
+                       wasOpened = true;
+               }
+       }
+
+       /**
+        * Reporter that exposes whether open() was called.
+        */
+       protected static class TestReporter12 extends TestReporter {
+               public static boolean wasOpened = false;
+
+               @Override
+               public void open(MetricConfig config) {
+                       wasOpened = true;
+               }
+       }
+
+       /**
+        * Reporter that exposes whether open() was called.
+        */
+       protected static class TestReporter13 extends TestReporter {
+               public static boolean wasOpened = false;
+
+               @Override
+               public void open(MetricConfig config) {
+                       wasOpened = true;
+               }
+       }
+
+       /**
+        * Verifies that configured arguments are properly forwarded to the 
reporter.
+        */
+       @Test
+       public void testReporterArgumentForwarding() {
+               Configuration config = new Configuration();
+
+               config.setString(MetricOptions.REPORTERS_LIST, "test");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter2.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg1", "hello");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg2", "world");
+
+               new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)).shutdown();
+
+               Assert.assertEquals("hello", TestReporter2.mc.getString("arg1", 
null));
+               Assert.assertEquals("world", TestReporter2.mc.getString("arg2", 
null));
+       }
+
+       /**
+        * Reporter that exposes the {@link MetricConfig} it was given.
+        */
+       protected static class TestReporter2 extends TestReporter {
+               static MetricConfig mc;
+               @Override
+               public void open(MetricConfig config) {
+                       mc = config;
+               }
+       }
+
+       /**
+        * Verifies that reporters implementing the Scheduled interface are 
regularly called to report the metrics.
+        *
+        * @throws InterruptedException
+        */
+       @Test
+       public void testReporterScheduling() throws InterruptedException {
+               Configuration config = new Configuration();
+
+               config.setString(MetricOptions.REPORTERS_LIST, "test");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter3.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg1", "hello");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
+
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+               long start = System.currentTimeMillis();
+
+               // only start counting from now on
+               TestReporter3.reportCount = 0;
+
+               for (int x = 0; x < 10; x++) {
+                       Thread.sleep(100);
+                       int reportCount = TestReporter3.reportCount;
+                       long curT = System.currentTimeMillis();
+                       /**
+                        * Within a given time-frame T only T/500 reports may 
be triggered due to the interval between reports.
+                        * This value however does not not take the first 
triggered report into account (=> +1).
+                        * Furthermore we have to account for the mis-alignment 
between reports being triggered and our time
+                        * measurement (=> +1); for T=200 a total of 4-6 
reports may have been
+                        * triggered depending on whether the end of the 
interval for the first reports ends before
+                        * or after T=50.
+                        */
+                       long maxAllowedReports = (curT - start) / 50 + 2;
+                       Assert.assertTrue("Too many reports were triggered.", 
maxAllowedReports >= reportCount);
+               }
+               Assert.assertTrue("No report was triggered.", 
TestReporter3.reportCount > 0);
+
+               registry.shutdown();
+       }
+
+       /**
+        * Reporter that exposes how often report() was called.
+        */
+       protected static class TestReporter3 extends TestReporter implements 
Scheduled {
+               public static int reportCount = 0;
+
+               @Override
+               public void report() {
+                       reportCount++;
+               }
+       }
+
+       /**
+        * Verifies that reporters are notified of added/removed metrics.
+        */
+       @Test
+       public void testReporterNotifications() {
+               Configuration config = new Configuration();
+               config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter6.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter7.class.getName());
+
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+               TaskManagerMetricGroup root = new 
TaskManagerMetricGroup(registry, "host", "id");
+               root.counter("rootCounter");
+
+               assertTrue(TestReporter6.addedMetric instanceof Counter);
+               assertEquals("rootCounter", TestReporter6.addedMetricName);
+
+               assertTrue(TestReporter7.addedMetric instanceof Counter);
+               assertEquals("rootCounter", TestReporter7.addedMetricName);
+
+               root.close();
+
+               assertTrue(TestReporter6.removedMetric instanceof Counter);
+               assertEquals("rootCounter", TestReporter6.removedMetricName);
+
+               assertTrue(TestReporter7.removedMetric instanceof Counter);
+               assertEquals("rootCounter", TestReporter7.removedMetricName);
+
+               registry.shutdown();
+       }
+
+       /**
+        * Reporter that exposes the name and metric instance of the last 
metric that was added or removed.
+        */
+       protected static class TestReporter6 extends TestReporter {
+               static Metric addedMetric;
+               static String addedMetricName;
+
+               static Metric removedMetric;
+               static String removedMetricName;
+
+               @Override
+               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       addedMetric = metric;
+                       addedMetricName = metricName;
+               }
+
+               @Override
+               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       removedMetric = metric;
+                       removedMetricName = metricName;
+               }
+       }
+
+       /**
+        * Reporter that exposes the name and metric instance of the last 
metric that was added or removed.
+        */
+       protected static class TestReporter7 extends TestReporter {
+               static Metric addedMetric;
+               static String addedMetricName;
+
+               static Metric removedMetric;
+               static String removedMetricName;
+
+               @Override
+               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       addedMetric = metric;
+                       addedMetricName = metricName;
+               }
+
+               @Override
+               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       removedMetric = metric;
+                       removedMetricName = metricName;
+               }
+       }
+
+       /**
+        * Verifies that the scope configuration is properly extracted.
+        */
+       @Test
+       public void testScopeConfig() {
+               Configuration config = new Configuration();
+
+               config.setString(MetricOptions.SCOPE_NAMING_TM, "A");
+               config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "B");
+               config.setString(MetricOptions.SCOPE_NAMING_TASK, "C");
+               config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "D");
+
+               ScopeFormats scopeConfig = ScopeFormats.fromConfig(config);
+
+               assertEquals("A", scopeConfig.getTaskManagerFormat().format());
+               assertEquals("B", 
scopeConfig.getTaskManagerJobFormat().format());
+               assertEquals("C", scopeConfig.getTaskFormat().format());
+               assertEquals("D", scopeConfig.getOperatorFormat().format());
+       }
+
+       @Test
+       public void testConfigurableDelimiter() {
+               Configuration config = new Configuration();
+               config.setString(MetricOptions.SCOPE_DELIMITER, "_");
+               config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E");
+
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "host", "id");
+               assertEquals("A_B_C_D_E_name", 
tmGroup.getMetricIdentifier("name"));
+
+               registry.shutdown();
+       }
+
+       @Test
+       public void testConfigurableDelimiterForReporters() {
+               Configuration config = new Configuration();
+               config.setString(MetricOptions.REPORTERS_LIST, 
"test1,test2,test3");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+               assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter());
+               assertEquals('_', registry.getDelimiter(0));
+               assertEquals('-', registry.getDelimiter(1));
+               assertEquals(GLOBAL_DEFAULT_DELIMITER, 
registry.getDelimiter(2));
+               assertEquals(GLOBAL_DEFAULT_DELIMITER, 
registry.getDelimiter(3));
+               assertEquals(GLOBAL_DEFAULT_DELIMITER, 
registry.getDelimiter(-1));
+
+               registry.shutdown();
+       }
+
+       @Test
+       public void testConfigurableDelimiterForReportersInGroup() {
+               Configuration config = new Configuration();
+               config.setString(MetricOptions.REPORTERS_LIST, 
"test1,test2,test3,test4");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+               config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B");
+
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+               List<MetricReporter> reporters = registry.getReporters();
+               ((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; 
//test1  reporter
+               ((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; 
//test2 reporter
+               ((TestReporter8) reporters.get(2)).expectedDelimiter = 
GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
+               ((TestReporter8) reporters.get(3)).expectedDelimiter = 
GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
+
+               TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, "host", "id");
+               group.counter("C");
+               group.close();
+               registry.shutdown();
+               assertEquals(4, TestReporter8.numCorrectDelimitersForRegister);
+               assertEquals(4, 
TestReporter8.numCorrectDelimitersForUnregister);
+       }
+
+       /**
+        * Tests that the query actor will be stopped when the MetricRegistry 
is shut down.
+        */
+       @Test
+       public void testQueryActorShutdown() throws Exception {
+               final FiniteDuration timeout = new FiniteDuration(10L, 
TimeUnit.SECONDS);
+
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+
+               final ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
+
+               registry.startQueryService(actorSystem, null);
+
+               ActorRef queryServiceActor = registry.getQueryService();
+
+               registry.shutdown();
+
+               try {
+                       
Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout),
 timeout);
+
+                       fail("The query actor should be terminated resulting in 
a ActorNotFound exception.");
+               } catch (ActorNotFound e) {
+                       // we expect the query actor to be shut down
+               }
+       }
+
+       /**
+        * Reporter that verifies that the configured delimiter is applied 
correctly when generating the metric identifier.
+        */
+       public static class TestReporter8 extends TestReporter {
+               char expectedDelimiter;
+               public static int numCorrectDelimitersForRegister = 0;
+               public static int numCorrectDelimitersForUnregister = 0;
+
+               @Override
+               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       String expectedMetric = "A" + expectedDelimiter + "B" + 
expectedDelimiter + "C";
+                       assertEquals(expectedMetric, 
group.getMetricIdentifier(metricName, this));
+                       assertEquals(expectedMetric, 
group.getMetricIdentifier(metricName));
+                       numCorrectDelimitersForRegister++;
+               }
+
+               @Override
+               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       String expectedMetric = "A" + expectedDelimiter + "B" + 
expectedDelimiter + "C";
+                       assertEquals(expectedMetric, 
group.getMetricIdentifier(metricName, this));
+                       assertEquals(expectedMetric, 
group.getMetricIdentifier(metricName));
+                       numCorrectDelimitersForUnregister++;
+               }
+       }
+
+       @Test
+       public void testExceptionIsolation() throws Exception {
+
+               Configuration config = new Configuration();
+               config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
FailingReporter.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter7.class.getName());
+
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+               Counter metric = new SimpleCounter();
+               registry.register(metric, "counter", new 
MetricGroupTest.DummyAbstractMetricGroup(registry));
+
+               assertEquals(metric, TestReporter7.addedMetric);
+               assertEquals("counter", TestReporter7.addedMetricName);
+
+               registry.unregister(metric, "counter", new 
MetricGroupTest.DummyAbstractMetricGroup(registry));
+
+               assertEquals(metric, TestReporter7.removedMetric);
+               assertEquals("counter", TestReporter7.removedMetricName);
+
+               registry.shutdown();
+       }
+
+       /**
+        * Reporter that throws an exception when it is notified of an added or 
removed metric.
+        */
+       protected static class FailingReporter extends TestReporter {
+               @Override
+               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       throw new RuntimeException();
+               }
+
+               @Override
+               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       throw new RuntimeException();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
deleted file mode 100644
index 284b86a..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ /dev/null
@@ -1,496 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.metrics;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.metrics.groups.MetricGroupTest;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.runtime.metrics.scope.ScopeFormats;
-import org.apache.flink.runtime.metrics.util.TestReporter;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorNotFound;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the {@link MetricRegistry}.
- */
-public class MetricRegistryTest extends TestLogger {
-
-       private static final char GLOBAL_DEFAULT_DELIMITER = '.';
-
-       @Test
-       public void testIsShutdown() {
-               MetricRegistry metricRegistry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
-               Assert.assertFalse(metricRegistry.isShutdown());
-
-               metricRegistry.shutdown();
-
-               Assert.assertTrue(metricRegistry.isShutdown());
-       }
-
-       /**
-        * Verifies that the reporter class argument is correctly used to 
instantiate and open the reporter.
-        */
-       @Test
-       public void testReporterInstantiation() {
-               Configuration config = new Configuration();
-
-               config.setString(MetricOptions.REPORTERS_LIST, "test");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
-
-               MetricRegistry metricRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-               assertTrue(metricRegistry.getReporters().size() == 1);
-
-               Assert.assertTrue(TestReporter1.wasOpened);
-
-               metricRegistry.shutdown();
-       }
-
-       /**
-        * Reporter that exposes whether open() was called.
-        */
-       protected static class TestReporter1 extends TestReporter {
-               public static boolean wasOpened = false;
-
-               @Override
-               public void open(MetricConfig config) {
-                       wasOpened = true;
-               }
-       }
-
-       /**
-        * Verifies that multiple reporters are instantiated correctly.
-        */
-       @Test
-       public void testMultipleReporterInstantiation() {
-               Configuration config = new Configuration();
-
-               config.setString(MetricOptions.REPORTERS_LIST, "test1, 
test2,test3");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter11.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter12.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter13.class.getName());
-
-               MetricRegistry metricRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-               assertTrue(metricRegistry.getReporters().size() == 3);
-
-               Assert.assertTrue(TestReporter11.wasOpened);
-               Assert.assertTrue(TestReporter12.wasOpened);
-               Assert.assertTrue(TestReporter13.wasOpened);
-
-               metricRegistry.shutdown();
-       }
-
-       /**
-        * Reporter that exposes whether open() was called.
-        */
-       protected static class TestReporter11 extends TestReporter {
-               public static boolean wasOpened = false;
-
-               @Override
-               public void open(MetricConfig config) {
-                       wasOpened = true;
-               }
-       }
-
-       /**
-        * Reporter that exposes whether open() was called.
-        */
-       protected static class TestReporter12 extends TestReporter {
-               public static boolean wasOpened = false;
-
-               @Override
-               public void open(MetricConfig config) {
-                       wasOpened = true;
-               }
-       }
-
-       /**
-        * Reporter that exposes whether open() was called.
-        */
-       protected static class TestReporter13 extends TestReporter {
-               public static boolean wasOpened = false;
-
-               @Override
-               public void open(MetricConfig config) {
-                       wasOpened = true;
-               }
-       }
-
-       /**
-        * Verifies that configured arguments are properly forwarded to the 
reporter.
-        */
-       @Test
-       public void testReporterArgumentForwarding() {
-               Configuration config = new Configuration();
-
-               config.setString(MetricOptions.REPORTERS_LIST, "test");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter2.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg1", "hello");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg2", "world");
-
-               new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)).shutdown();
-
-               Assert.assertEquals("hello", TestReporter2.mc.getString("arg1", 
null));
-               Assert.assertEquals("world", TestReporter2.mc.getString("arg2", 
null));
-       }
-
-       /**
-        * Reporter that exposes the {@link MetricConfig} it was given.
-        */
-       protected static class TestReporter2 extends TestReporter {
-               static MetricConfig mc;
-               @Override
-               public void open(MetricConfig config) {
-                       mc = config;
-               }
-       }
-
-       /**
-        * Verifies that reporters implementing the Scheduled interface are 
regularly called to report the metrics.
-        *
-        * @throws InterruptedException
-        */
-       @Test
-       public void testReporterScheduling() throws InterruptedException {
-               Configuration config = new Configuration();
-
-               config.setString(MetricOptions.REPORTERS_LIST, "test");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter3.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg1", "hello");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
-
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-               long start = System.currentTimeMillis();
-
-               // only start counting from now on
-               TestReporter3.reportCount = 0;
-
-               for (int x = 0; x < 10; x++) {
-                       Thread.sleep(100);
-                       int reportCount = TestReporter3.reportCount;
-                       long curT = System.currentTimeMillis();
-                       /**
-                        * Within a given time-frame T only T/500 reports may 
be triggered due to the interval between reports.
-                        * This value however does not not take the first 
triggered report into account (=> +1).
-                        * Furthermore we have to account for the mis-alignment 
between reports being triggered and our time
-                        * measurement (=> +1); for T=200 a total of 4-6 
reports may have been
-                        * triggered depending on whether the end of the 
interval for the first reports ends before
-                        * or after T=50.
-                        */
-                       long maxAllowedReports = (curT - start) / 50 + 2;
-                       Assert.assertTrue("Too many reports were triggered.", 
maxAllowedReports >= reportCount);
-               }
-               Assert.assertTrue("No report was triggered.", 
TestReporter3.reportCount > 0);
-
-               registry.shutdown();
-       }
-
-       /**
-        * Reporter that exposes how often report() was called.
-        */
-       protected static class TestReporter3 extends TestReporter implements 
Scheduled {
-               public static int reportCount = 0;
-
-               @Override
-               public void report() {
-                       reportCount++;
-               }
-       }
-
-       /**
-        * Verifies that reporters are notified of added/removed metrics.
-        */
-       @Test
-       public void testReporterNotifications() {
-               Configuration config = new Configuration();
-               config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter6.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter7.class.getName());
-
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-               TaskManagerMetricGroup root = new 
TaskManagerMetricGroup(registry, "host", "id");
-               root.counter("rootCounter");
-
-               assertTrue(TestReporter6.addedMetric instanceof Counter);
-               assertEquals("rootCounter", TestReporter6.addedMetricName);
-
-               assertTrue(TestReporter7.addedMetric instanceof Counter);
-               assertEquals("rootCounter", TestReporter7.addedMetricName);
-
-               root.close();
-
-               assertTrue(TestReporter6.removedMetric instanceof Counter);
-               assertEquals("rootCounter", TestReporter6.removedMetricName);
-
-               assertTrue(TestReporter7.removedMetric instanceof Counter);
-               assertEquals("rootCounter", TestReporter7.removedMetricName);
-
-               registry.shutdown();
-       }
-
-       /**
-        * Reporter that exposes the name and metric instance of the last 
metric that was added or removed.
-        */
-       protected static class TestReporter6 extends TestReporter {
-               static Metric addedMetric;
-               static String addedMetricName;
-
-               static Metric removedMetric;
-               static String removedMetricName;
-
-               @Override
-               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
-                       addedMetric = metric;
-                       addedMetricName = metricName;
-               }
-
-               @Override
-               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
-                       removedMetric = metric;
-                       removedMetricName = metricName;
-               }
-       }
-
-       /**
-        * Reporter that exposes the name and metric instance of the last 
metric that was added or removed.
-        */
-       protected static class TestReporter7 extends TestReporter {
-               static Metric addedMetric;
-               static String addedMetricName;
-
-               static Metric removedMetric;
-               static String removedMetricName;
-
-               @Override
-               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
-                       addedMetric = metric;
-                       addedMetricName = metricName;
-               }
-
-               @Override
-               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
-                       removedMetric = metric;
-                       removedMetricName = metricName;
-               }
-       }
-
-       /**
-        * Verifies that the scope configuration is properly extracted.
-        */
-       @Test
-       public void testScopeConfig() {
-               Configuration config = new Configuration();
-
-               config.setString(MetricOptions.SCOPE_NAMING_TM, "A");
-               config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "B");
-               config.setString(MetricOptions.SCOPE_NAMING_TASK, "C");
-               config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "D");
-
-               ScopeFormats scopeConfig = ScopeFormats.fromConfig(config);
-
-               assertEquals("A", scopeConfig.getTaskManagerFormat().format());
-               assertEquals("B", 
scopeConfig.getTaskManagerJobFormat().format());
-               assertEquals("C", scopeConfig.getTaskFormat().format());
-               assertEquals("D", scopeConfig.getOperatorFormat().format());
-       }
-
-       @Test
-       public void testConfigurableDelimiter() {
-               Configuration config = new Configuration();
-               config.setString(MetricOptions.SCOPE_DELIMITER, "_");
-               config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E");
-
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "host", "id");
-               assertEquals("A_B_C_D_E_name", 
tmGroup.getMetricIdentifier("name"));
-
-               registry.shutdown();
-       }
-
-       @Test
-       public void testConfigurableDelimiterForReporters() {
-               Configuration config = new Configuration();
-               config.setString(MetricOptions.REPORTERS_LIST, 
"test1,test2,test3");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
-
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-               assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter());
-               assertEquals('_', registry.getDelimiter(0));
-               assertEquals('-', registry.getDelimiter(1));
-               assertEquals(GLOBAL_DEFAULT_DELIMITER, 
registry.getDelimiter(2));
-               assertEquals(GLOBAL_DEFAULT_DELIMITER, 
registry.getDelimiter(3));
-               assertEquals(GLOBAL_DEFAULT_DELIMITER, 
registry.getDelimiter(-1));
-
-               registry.shutdown();
-       }
-
-       @Test
-       public void testConfigurableDelimiterForReportersInGroup() {
-               Configuration config = new Configuration();
-               config.setString(MetricOptions.REPORTERS_LIST, 
"test1,test2,test3,test4");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
-               config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B");
-
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-               List<MetricReporter> reporters = registry.getReporters();
-               ((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; 
//test1  reporter
-               ((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; 
//test2 reporter
-               ((TestReporter8) reporters.get(2)).expectedDelimiter = 
GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
-               ((TestReporter8) reporters.get(3)).expectedDelimiter = 
GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
-
-               TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, "host", "id");
-               group.counter("C");
-               group.close();
-               registry.shutdown();
-               assertEquals(4, TestReporter8.numCorrectDelimitersForRegister);
-               assertEquals(4, 
TestReporter8.numCorrectDelimitersForUnregister);
-       }
-
-       /**
-        * Tests that the query actor will be stopped when the MetricRegistry 
is shut down.
-        */
-       @Test
-       public void testQueryActorShutdown() throws Exception {
-               final FiniteDuration timeout = new FiniteDuration(10L, 
TimeUnit.SECONDS);
-
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
-               final ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
-
-               registry.startQueryService(actorSystem, null);
-
-               ActorRef queryServiceActor = registry.getQueryService();
-
-               registry.shutdown();
-
-               try {
-                       
Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout),
 timeout);
-
-                       fail("The query actor should be terminated resulting in 
a ActorNotFound exception.");
-               } catch (ActorNotFound e) {
-                       // we expect the query actor to be shut down
-               }
-       }
-
-       /**
-        * Reporter that verifies that the configured delimiter is applied 
correctly when generating the metric identifier.
-        */
-       public static class TestReporter8 extends TestReporter {
-               char expectedDelimiter;
-               public static int numCorrectDelimitersForRegister = 0;
-               public static int numCorrectDelimitersForUnregister = 0;
-
-               @Override
-               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
-                       String expectedMetric = "A" + expectedDelimiter + "B" + 
expectedDelimiter + "C";
-                       assertEquals(expectedMetric, 
group.getMetricIdentifier(metricName, this));
-                       assertEquals(expectedMetric, 
group.getMetricIdentifier(metricName));
-                       numCorrectDelimitersForRegister++;
-               }
-
-               @Override
-               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
-                       String expectedMetric = "A" + expectedDelimiter + "B" + 
expectedDelimiter + "C";
-                       assertEquals(expectedMetric, 
group.getMetricIdentifier(metricName, this));
-                       assertEquals(expectedMetric, 
group.getMetricIdentifier(metricName));
-                       numCorrectDelimitersForUnregister++;
-               }
-       }
-
-       @Test
-       public void testExceptionIsolation() throws Exception {
-
-               Configuration config = new Configuration();
-               config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
FailingReporter.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter7.class.getName());
-
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-               Counter metric = new SimpleCounter();
-               registry.register(metric, "counter", new 
MetricGroupTest.DummyAbstractMetricGroup(registry));
-
-               assertEquals(metric, TestReporter7.addedMetric);
-               assertEquals("counter", TestReporter7.addedMetricName);
-
-               registry.unregister(metric, "counter", new 
MetricGroupTest.DummyAbstractMetricGroup(registry));
-
-               assertEquals(metric, TestReporter7.removedMetric);
-               assertEquals("counter", TestReporter7.removedMetricName);
-
-               registry.shutdown();
-       }
-
-       /**
-        * Reporter that throws an exception when it is notified of an added or 
removed metric.
-        */
-       protected static class FailingReporter extends TestReporter {
-               @Override
-               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
-                       throw new RuntimeException();
-               }
-
-               @Override
-               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
-                       throw new RuntimeException();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
new file mode 100644
index 0000000..1140e3d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+/**
+ * Metric registry which does nothing and is intended for testing purposes.
+ */
+public class NoOpMetricRegistry implements MetricRegistry {
+
+       final char delimiter = ',';
+
+       final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new 
Configuration());
+
+       @Override
+       public char getDelimiter() {
+               return delimiter;
+       }
+
+       @Override
+       public char getDelimiter(int index) {
+               return delimiter;
+       }
+
+       @Override
+       public int getNumberReporters() {
+               return 0;
+       }
+
+       @Override
+       public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {}
+
+       @Override
+       public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {}
+
+       @Override
+       public ScopeFormats getScopeFormats() {
+               return scopeFormats;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index cea0928..31304e5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -61,6 +61,9 @@ public class TaskManagerMetricsTest extends TestLogger {
 
                HighAvailabilityServices highAvailabilityServices = new 
EmbeddedHaServices(TestingUtils.defaultExecutor());
 
+               final MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(
+                       MetricRegistryConfiguration.fromConfiguration(new 
Configuration()));
+
                try {
                        actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
 
@@ -73,6 +76,7 @@ public class TaskManagerMetricsTest extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
+                               new NoOpMetricRegistry(),
                                Option.empty(),
                                JobManager.class,
                                MemoryArchivist.class)._1();
@@ -89,9 +93,9 @@ public class TaskManagerMetricsTest extends TestLogger {
                        TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(config);
 
                        TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(
-                                       taskManagerServicesConfiguration, 
tmResourceID);
-
-                       final MetricRegistry tmRegistry = 
taskManagerServices.getMetricRegistry();
+                               taskManagerServicesConfiguration,
+                               tmResourceID,
+                               metricRegistry);
 
                        // create the task manager
                        final Props tmProps = TaskManager.getTaskManagerProps(
@@ -103,7 +107,7 @@ public class TaskManagerMetricsTest extends TestLogger {
                                taskManagerServices.getIOManager(),
                                taskManagerServices.getNetworkEnvironment(),
                                highAvailabilityServices,
-                               tmRegistry);
+                               
taskManagerServices.getTaskManagerMetricGroup());
 
                        final ActorRef taskManager = 
actorSystem.actorOf(tmProps);
 
@@ -135,7 +139,7 @@ public class TaskManagerMetricsTest extends TestLogger {
                        }};
 
                        // verify that the registry was not shutdown due to the 
disconnect
-                       Assert.assertFalse(tmRegistry.isShutdown());
+                       Assert.assertFalse(metricRegistry.isShutdown());
 
                        // shut down the actors and the actor system
                        actorSystem.shutdown();
@@ -148,6 +152,8 @@ public class TaskManagerMetricsTest extends TestLogger {
                        if (highAvailabilityServices != null) {
                                
highAvailabilityServices.closeAndCleanupAllData();
                        }
+
+                       metricRegistry.shutdown();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 5c33ad6..55ba3a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
 import org.apache.flink.util.TestLogger;
@@ -82,7 +82,7 @@ public class MetricQueryServiceTest extends TestLogger {
                        }
                };
 
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                final TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
 
                MetricQueryService.notifyOfAddedMetric(serviceActor, c, 
"counter", tm);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index 648ee47..8d91b81 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.TestReporter;
 
@@ -45,7 +45,7 @@ public class AbstractMetricGroupTest {
         */
        @Test
        public void testGetAllVariables() {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
                AbstractMetricGroup group = new 
AbstractMetricGroup<AbstractMetricGroup<?>>(registry, new String[0], null) {
                        @Override
@@ -90,7 +90,7 @@ public class AbstractMetricGroupTest {
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter2.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");
 
-               MetricRegistry testRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+               MetricRegistryImpl testRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
                try {
                        MetricGroup tmGroup = new 
TaskManagerMetricGroup(testRegistry, "host", "id");
                        tmGroup.counter("1");
@@ -180,7 +180,7 @@ public class AbstractMetricGroupTest {
        public void testScopeGenerationWithoutReporters() {
                Configuration config = new Configuration();
                config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
-               MetricRegistry testRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+               MetricRegistryImpl testRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
                try {
                        TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(testRegistry, "host", "id");

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 03341a6..05a72ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
@@ -45,7 +45,7 @@ public class JobManagerGroupTest extends TestLogger {
 
        @Test
        public void addAndRemoveJobs() {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                final JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
 
                final JobID jid1 = new JobID();
@@ -77,7 +77,7 @@ public class JobManagerGroupTest extends TestLogger {
 
        @Test
        public void testCloseClosesAll() {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                final JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
 
                final JobID jid1 = new JobID();
@@ -103,7 +103,7 @@ public class JobManagerGroupTest extends TestLogger {
 
        @Test
        public void testGenerateScopeDefault() {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
 
                assertArrayEquals(new String[]{"localhost", "jobmanager"}, 
group.getScopeComponents());
@@ -116,7 +116,7 @@ public class JobManagerGroupTest extends TestLogger {
        public void testGenerateScopeCustom() {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_JM, 
"constant.<host>.foo.<host>");
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "host");
 
@@ -128,7 +128,7 @@ public class JobManagerGroupTest extends TestLogger {
 
        @Test
        public void testCreateQueryServiceMetricInfo() {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, 
"host");
 
                QueryScopeInfo.JobManagerQueryScopeInfo info = 
jm.createQueryServiceMetricInfo(new DummyCharacterFilter());

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index d734dfd..4373f80 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
@@ -39,7 +39,7 @@ public class JobManagerJobGroupTest extends TestLogger {
 
        @Test
        public void testGenerateScopeDefault() {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
                JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName");
                JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, new JobID(), "myJobName");
@@ -60,7 +60,7 @@ public class JobManagerJobGroupTest extends TestLogger {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc");
                cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, 
"some-constant.<job_name>");
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                JobID jid = new JobID();
 
@@ -83,7 +83,7 @@ public class JobManagerJobGroupTest extends TestLogger {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter");
                cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, 
"*.some-constant.<job_id>");
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                JobID jid = new JobID();
 
@@ -104,7 +104,7 @@ public class JobManagerJobGroupTest extends TestLogger {
        @Test
        public void testCreateQueryServiceMetricInfo() {
                JobID jid = new JobID();
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, 
"host");
                JobManagerJobMetricGroup jmj = new 
JobManagerJobMetricGroup(registry, jm, jid, "jobname");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index 56ce5fa..324bb73 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -27,9 +27,10 @@ import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.util.TestReporter;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -39,7 +40,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for the registration of groups and metrics on a {@link MetricGroup}.
  */
-public class MetricGroupRegistrationTest {
+public class MetricGroupRegistrationTest extends TestLogger {
        /**
         * Verifies that group methods instantiate the correct metric with the 
given name.
         */
@@ -49,7 +50,7 @@ public class MetricGroupRegistrationTest {
                config.setString(MetricOptions.REPORTERS_LIST, "test");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
 
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
                MetricGroup root = new TaskManagerMetricGroup(registry, "host", 
"id");
 
@@ -111,7 +112,7 @@ public class MetricGroupRegistrationTest {
        public void testDuplicateGroupName() {
                Configuration config = new Configuration();
 
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
                MetricGroup root = new TaskManagerMetricGroup(registry, "host", 
"id");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 633dbed..94760e6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
@@ -48,13 +49,13 @@ public class MetricGroupTest extends TestLogger {
 
        private static final MetricRegistryConfiguration 
defaultMetricRegistryConfiguration = 
MetricRegistryConfiguration.defaultMetricRegistryConfiguration();
 
-       private MetricRegistry registry;
+       private MetricRegistryImpl registry;
 
-       private final MetricRegistry exceptionOnRegister = new 
ExceptionOnRegisterRegistry();
+       private final MetricRegistryImpl exceptionOnRegister = new 
ExceptionOnRegisterRegistry();
 
        @Before
        public void createRegistry() {
-               this.registry = new 
MetricRegistry(defaultMetricRegistryConfiguration);
+               this.registry = new 
MetricRegistryImpl(defaultMetricRegistryConfiguration);
        }
 
        @After
@@ -134,7 +135,7 @@ public class MetricGroupTest extends TestLogger {
                JobID jid = new JobID();
                JobVertexID vid = new JobVertexID();
                AbstractID eid = new AbstractID();
-               MetricRegistry registry = new 
MetricRegistry(defaultMetricRegistryConfiguration);
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(defaultMetricRegistryConfiguration);
                TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
                TaskManagerJobMetricGroup job = new 
TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
                TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, 
eid, "taskName", 4, 5);
@@ -156,7 +157,7 @@ public class MetricGroupTest extends TestLogger {
 
        // 
------------------------------------------------------------------------
 
-       private static class ExceptionOnRegisterRegistry extends MetricRegistry 
{
+       private static class ExceptionOnRegisterRegistry extends 
MetricRegistryImpl {
 
                public ExceptionOnRegisterRegistry() {
                        super(defaultMetricRegistryConfiguration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index 4363a9d..820b73e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
@@ -47,7 +47,7 @@ public class OperatorGroupTest extends TestLogger {
 
        @Test
        public void testGenerateScopeDefault() {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
                TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
@@ -70,7 +70,7 @@ public class OperatorGroupTest extends TestLogger {
        public void testGenerateScopeCustom() {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, 
"<tm_id>.<job_id>.<task_id>.<operator_name>.<operator_id>");
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
                try {
                        String tmID = "test-tm-id";
                        JobID jid = new JobID();
@@ -97,7 +97,7 @@ public class OperatorGroupTest extends TestLogger {
 
        @Test
        public void testIOMetricGroupInstantiation() {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
                TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
@@ -114,7 +114,7 @@ public class OperatorGroupTest extends TestLogger {
 
        @Test
        public void testVariables() {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
                JobID jid = new JobID();
                JobVertexID tid = new JobVertexID();
@@ -156,7 +156,7 @@ public class OperatorGroupTest extends TestLogger {
                JobVertexID vid = new JobVertexID();
                AbstractID eid = new AbstractID();
                OperatorID oid = new OperatorID();
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
                TaskManagerJobMetricGroup job = new 
TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
                TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, 
eid, "taskName", 4, 5);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index bd85303..3272f73 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
@@ -50,7 +50,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
        @Test
        public void addAndRemoveJobs() throws IOException {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
                final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
                                registry, "localhost", new 
AbstractID().toString());
@@ -112,7 +112,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
        @Test
        public void testCloseClosesAll() throws IOException {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
                        registry, "localhost", new AbstractID().toString());
 
@@ -152,7 +152,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
        @Test
        public void testGenerateScopeDefault() {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, "localhost", "id");
 
                assertArrayEquals(new String[]{"localhost", "taskmanager", 
"id"}, group.getScopeComponents());
@@ -164,7 +164,7 @@ public class TaskManagerGroupTest extends TestLogger {
        public void testGenerateScopeCustom() {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_TM, 
"constant.<host>.foo.<host>");
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
                TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, "host", "id");
 
                assertArrayEquals(new String[]{"constant", "host", "foo", 
"host"}, group.getScopeComponents());
@@ -174,7 +174,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
        @Test
        public void testCreateQueryServiceMetricInfo() {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
 
                QueryScopeInfo.TaskManagerQueryScopeInfo info = 
tm.createQueryServiceMetricInfo(new DummyCharacterFilter());

Reply via email to