[FLINK-8213][metrics] Improve fallback behaviors

This closes #8213.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bf0fdc2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bf0fdc2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bf0fdc2

Branch: refs/heads/master
Commit: 0bf0fdc26ea86020929fa64d083dce02ba2a2ae2
Parents: 493c285
Author: zentol <[email protected]>
Authored: Wed Dec 6 14:39:15 2017 +0100
Committer: zentol <[email protected]>
Committed: Tue Dec 12 19:09:16 2017 +0100

----------------------------------------------------------------------
 .../kafka/internal/KafkaConsumerThreadTest.java |   4 +-
 .../flink/storm/wrappers/BoltWrapperTest.java   |   4 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |   2 +-
 .../flink/runtime/jobmaster/JobMaster.java      |   6 +-
 .../runtime/metrics/NoOpMetricRegistry.java     |  73 +++++++++
 .../groups/UnregisteredMetricGroups.java        | 164 +++++++++++++++++++
 .../flink/runtime/operators/DataSinkTask.java   |  22 ++-
 .../flink/runtime/operators/DataSourceTask.java |  25 ++-
 .../IndividualRestartsConcurrencyTest.java      |   4 +-
 .../partition/InputGateConcurrentTest.java      |  16 +-
 .../partition/InputGateFairnessTest.java        |  18 +-
 .../consumer/LocalInputChannelTest.java         |  14 +-
 .../consumer/RemoteInputChannelTest.java        |   8 +-
 .../partition/consumer/SingleInputGateTest.java |  24 +--
 .../partition/consumer/TestSingleInputGate.java |   4 +-
 .../partition/consumer/UnionInputGateTest.java  |   6 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |   2 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   7 +-
 .../runtime/jobmanager/JobManagerTest.java      |  20 +--
 .../flink/runtime/jobmanager/JobSubmitTest.java |   2 +-
 .../JobManagerLeaderElectionTest.java           |   3 +-
 .../runtime/metrics/NoOpMetricRegistry.java     |  68 --------
 .../runtime/metrics/TaskManagerMetricsTest.java |   2 +-
 .../runtime/metrics/groups/MetricGroupTest.java |  10 +-
 .../metrics/groups/TaskIOMetricGroupTest.java   |   3 +-
 .../operators/drivers/TestTaskContext.java      |   4 +-
 .../testutils/BinaryOperatorTestBase.java       |   5 +-
 .../operators/testutils/DriverTestBase.java     |   3 +-
 .../operators/testutils/DummyEnvironment.java   |   3 +-
 .../operators/testutils/MockEnvironment.java    |   3 +-
 .../testutils/UnaryOperatorTestBase.java        |   3 +-
 .../testutils/UnregisteredTaskMetricsGroup.java |  83 ----------
 .../resourcemanager/ResourceManagerTest.java    |   2 +-
 ...askManagerComponentsStartupShutdownTest.java |   4 +-
 .../TaskManagerProcessReapingTestBase.java      |   2 +-
 .../taskmanager/TaskManagerStartupTest.java     |   2 +-
 .../runtime/util/JvmExitOnFatalErrorTest.java   |   4 +-
 .../api/operators/AbstractStreamOperator.java   |   6 +-
 .../runtime/io/StreamInputProcessor.java        |  13 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  13 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  18 +-
 .../functions/async/RichAsyncFunctionTest.java  |   3 +-
 .../operators/async/AsyncWaitOperatorTest.java  |   4 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |   4 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   4 +-
 .../tasks/StreamTaskTerminationTest.java        |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   4 +-
 .../tasks/TaskCheckpointingBehaviourTest.java   |   4 +-
 ...ctTaskManagerProcessFailureRecoveryTest.java |   2 +-
 .../JobManagerHACheckpointRecoveryITCase.java   |   2 +-
 ...agerHAProcessFailureBatchRecoveryITCase.java |   2 +-
 .../recovery/ProcessFailureCancelingITCase.java |   2 +-
 .../AbstractOperatorRestoreTestBase.java        |   4 +-
 .../flink/yarn/YarnResourceManagerTest.java     |   2 +-
 54 files changed, 439 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
index 2368091..383eb13 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.internal;
 
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import 
org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -716,7 +716,7 @@ public class KafkaConsumerThreadTest {
                                        handover,
                                        new Properties(),
                                        unassignedPartitionsQueue,
-                                       mock(MetricGroup.class),
+                                       new UnregisteredMetricsGroup(),
                                        new KafkaConsumerCallBridge(),
                                        "test-kafka-consumer-thread",
                                        0,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index f518d17..98816e4 100644
--- 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.SplitStreamType;
@@ -372,7 +372,7 @@ public class BoltWrapperTest extends AbstractTest {
                Environment env = mock(Environment.class);
                when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 1, 
0, 1, 0));
                
when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
-               when(env.getMetricGroup()).thenReturn(new 
UnregisteredTaskMetricsGroup());
+               
when(env.getMetricGroup()).thenReturn(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup());
                when(env.getTaskManagerInfo()).thenReturn(new 
TestingTaskManagerRuntimeInfo());
 
                StreamTask<?, ?> mockTask = mock(StreamTask.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 75a844c..e6cfdda 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -194,7 +194,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                                        TestingUtils.defaultExecutor(),
                                        TestingUtils.defaultExecutor(),
                                        highAvailabilityServices,
-                                       new NoOpMetricRegistry(),
+                                       NoOpMetricRegistry.INSTANCE,
                                        
Option.apply(webMonitor[i].getRestAddress()),
                                        JobManager.class,
                                        MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c4c4445..6d0de74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -78,6 +77,7 @@ import 
org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
@@ -264,8 +264,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        this.jobManagerMetricGroup = jobManagerMetricGroup;
                        this.jobMetricGroup = 
jobManagerMetricGroup.addJob(jobGraph);
                } else {
-                       this.jobManagerMetricGroup = new 
UnregisteredMetricsGroup();
-                       this.jobMetricGroup = new UnregisteredMetricsGroup();
+                       this.jobManagerMetricGroup = 
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup();
+                       this.jobMetricGroup = 
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
                }
 
                log.info("Initializing job {} ({}).", jobName, jid);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
new file mode 100644
index 0000000..c161aa2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import javax.annotation.Nullable;
+
+/**
+ * Metric registry which does nothing.
+ */
+public class NoOpMetricRegistry implements MetricRegistry {
+       private static final char delimiter = '.';
+       private static final ScopeFormats scopeFormats = 
ScopeFormats.fromConfig(new Configuration());
+
+       public static final MetricRegistry INSTANCE = new NoOpMetricRegistry();
+
+       private NoOpMetricRegistry() {
+       }
+
+       @Override
+       public char getDelimiter() {
+               return delimiter;
+       }
+
+       @Override
+       public char getDelimiter(int index) {
+               return delimiter;
+       }
+
+       @Override
+       public int getNumberReporters() {
+               return 0;
+       }
+
+       @Override
+       public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
+       }
+
+       @Override
+       public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
+       }
+
+       @Override
+       public ScopeFormats getScopeFormats() {
+               return scopeFormats;
+       }
+
+       @Nullable
+       @Override
+       public String getMetricQueryServicePath() {
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
new file mode 100644
index 0000000..3869aa6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+
+/**
+ * A collection of safe drop-in replacements for existing {@link 
ComponentMetricGroup}s.
+ */
+public class UnregisteredMetricGroups {
+
+       private UnregisteredMetricGroups() {
+       }
+
+       public static JobManagerMetricGroup 
createUnregisteredJobManagerMetricGroup() {
+               return new UnregisteredJobManagerMetricGroup();
+       }
+
+       public static JobManagerJobMetricGroup 
createUnregisteredJobManagerJobMetricGroup() {
+               return new UnregisteredJobManagerJobMetricGroup();
+       }
+
+       public static TaskManagerMetricGroup 
createUnregisteredTaskManagerMetricGroup() {
+               return new UnregisteredTaskManagerMetricGroup();
+       }
+
+       public static TaskManagerJobMetricGroup 
createUnregisteredTaskManagerJobMetricGroup() {
+               return new UnregisteredTaskManagerJobMetricGroup();
+       }
+
+       public static TaskMetricGroup createUnregisteredTaskMetricGroup() {
+               return new UnregisteredTaskMetricGroup();
+       }
+
+       public static OperatorMetricGroup 
createUnregisteredOperatorMetricGroup() {
+               return new UnregisteredOperatorMetricGroup();
+       }
+
+       /**
+        * A safe drop-in replacement for {@link JobManagerMetricGroup}s.
+        */
+       public static class UnregisteredJobManagerMetricGroup extends 
JobManagerMetricGroup {
+               private static final String DEFAULT_HOST_NAME = 
"UnregisteredHost";
+
+               private UnregisteredJobManagerMetricGroup() {
+                       super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME);
+               }
+
+               @Override
+               public JobManagerJobMetricGroup addJob(JobGraph job) {
+                       return createUnregisteredJobManagerJobMetricGroup();
+               }
+       }
+
+       /**
+        * A safe drop-in replacement for {@link JobManagerJobMetricGroup}s.
+        */
+       public static class UnregisteredJobManagerJobMetricGroup extends 
JobManagerJobMetricGroup {
+               private static final JobID DEFAULT_JOB_ID = new JobID(0, 0);
+               private static final String DEFAULT_JOB_NAME = 
"UnregisteredJob";
+
+               protected UnregisteredJobManagerJobMetricGroup() {
+                       super(NoOpMetricRegistry.INSTANCE, new 
UnregisteredJobManagerMetricGroup(), DEFAULT_JOB_ID, DEFAULT_JOB_NAME);
+               }
+       }
+
+       /**
+        * A safe drop-in replacement for {@link TaskManagerMetricGroup}s.
+        */
+       public static class UnregisteredTaskManagerMetricGroup extends 
TaskManagerMetricGroup {
+               private static final String DEFAULT_HOST_NAME = 
"UnregisteredHost";
+               private static final String DEFAULT_TASKMANAGER_ID = "0";
+
+               protected UnregisteredTaskManagerMetricGroup() {
+                       super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME, 
DEFAULT_TASKMANAGER_ID);
+               }
+
+               @Override
+               public TaskMetricGroup addTaskForJob(
+                       final JobID jobId,
+                       final String jobName,
+                       final JobVertexID jobVertexId,
+                       final ExecutionAttemptID executionAttemptId,
+                       final String taskName,
+                       final int subtaskIndex,
+                       final int attemptNumber) {
+                       return createUnregisteredTaskMetricGroup();
+               }
+       }
+
+       /**
+        * A safe drop-in replacement for {@link TaskManagerJobMetricGroup}s.
+        */
+       public static class UnregisteredTaskManagerJobMetricGroup extends 
TaskManagerJobMetricGroup {
+               private static final JobID DEFAULT_JOB_ID = new JobID(0, 0);
+               private static final String DEFAULT_JOB_NAME = 
"UnregisteredJob";
+
+               protected UnregisteredTaskManagerJobMetricGroup() {
+                       super(NoOpMetricRegistry.INSTANCE, new 
UnregisteredTaskManagerMetricGroup(), DEFAULT_JOB_ID, DEFAULT_JOB_NAME);
+               }
+
+               @Override
+               public TaskMetricGroup addTask(
+                       final JobVertexID jobVertexId,
+                       final ExecutionAttemptID executionAttemptID,
+                       final String taskName,
+                       final int subtaskIndex,
+                       final int attemptNumber) {
+                       return createUnregisteredTaskMetricGroup();
+               }
+       }
+
+       /**
+        * A safe drop-in replacement for {@link TaskMetricGroup}s.
+        */
+       public static class UnregisteredTaskMetricGroup extends TaskMetricGroup 
{
+               private static final JobVertexID DEFAULT_VERTEX_ID = new 
JobVertexID(0, 0);
+               private static final ExecutionAttemptID DEFAULT_ATTEMPT_ID = 
new ExecutionAttemptID(0, 0);
+               private static final String DEFAULT_TASK_NAME = 
"UnregisteredTask";
+
+               protected UnregisteredTaskMetricGroup() {
+                       super(NoOpMetricRegistry.INSTANCE, new 
UnregisteredTaskManagerJobMetricGroup(),
+                               DEFAULT_VERTEX_ID, DEFAULT_ATTEMPT_ID, 
DEFAULT_TASK_NAME, 0, 0);
+               }
+
+               @Override
+               public OperatorMetricGroup addOperator(OperatorID operatorID, 
String name) {
+                       return createUnregisteredOperatorMetricGroup();
+               }
+       }
+
+       /**
+        * A safe drop-in replacement for {@link OperatorMetricGroup}s.
+        */
+       public static class UnregisteredOperatorMetricGroup extends 
OperatorMetricGroup {
+               private static final OperatorID DEFAULT_OPERATOR_ID = new 
OperatorID(0, 0);
+               private static final String DEFAULT_OPERATOR_NAME = 
"UnregisteredOperator";
+
+               protected UnregisteredOperatorMetricGroup() {
+                       super(NoOpMetricRegistry.INSTANCE, new 
UnregisteredTaskMetricGroup(), DEFAULT_OPERATOR_ID, DEFAULT_OPERATOR_NAME);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index bd052f5..bb253ab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -29,12 +29,14 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import 
org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
@@ -107,10 +109,22 @@ public class DataSinkTask<IT> extends AbstractInvokable {
                LOG.debug(getLogString("Starting data sink operator"));
 
                RuntimeContext ctx = createRuntimeContext();
-               final Counter numRecordsIn = ((OperatorMetricGroup) 
ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
-               ((OperatorMetricGroup) 
ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
-               ((OperatorMetricGroup) 
ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
-               
+
+               final Counter numRecordsIn;
+               {
+                       Counter tmpNumRecordsIn;
+                       try {
+                               OperatorIOMetricGroup ioMetricGroup = 
((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
+                               ioMetricGroup.reuseInputMetricsForTask();
+                               ioMetricGroup.reuseOutputMetricsForTask();
+                               tmpNumRecordsIn = 
ioMetricGroup.getNumRecordsInCounter();
+                       } catch (Exception e) {
+                               LOG.warn("An exception occurred during the 
metrics setup.", e);
+                               tmpNumRecordsIn = new SimpleCounter();
+                       }
+                       numRecordsIn = tmpNumRecordsIn;
+               }
+
                
if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
                        ((RichOutputFormat) this.format).setRuntimeContext(ctx);
                        LOG.debug(getLogString("Rich Sink detected. 
Initializing runtime context."));

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 2600b2c..1437877 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -27,12 +27,14 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import 
org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
@@ -102,12 +104,25 @@ public class DataSourceTask<OT> extends AbstractInvokable 
{
                LOG.debug(getLogString("Starting data source operator"));
 
                RuntimeContext ctx = createRuntimeContext();
-               Counter completedSplitsCounter = 
ctx.getMetricGroup().counter("numSplitsProcessed");
-               ((OperatorMetricGroup) 
ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
-               Counter numRecordsOut = ((OperatorMetricGroup) 
ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsOutCounter();
-               if (this.config.getNumberOfChainedStubs() == 0) {
-                       ((OperatorMetricGroup) 
ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
+
+               final Counter numRecordsOut;
+               {
+                       Counter tmpNumRecordsOut;
+                       try {
+                               OperatorIOMetricGroup ioMetricGroup = 
((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
+                               ioMetricGroup.reuseInputMetricsForTask();
+                               if (this.config.getNumberOfChainedStubs() == 0) 
{
+                                       
ioMetricGroup.reuseOutputMetricsForTask();
+                               }
+                               tmpNumRecordsOut = 
ioMetricGroup.getNumRecordsOutCounter();
+                       } catch (Exception e) {
+                               LOG.warn("An exception occurred during the 
metrics setup.", e);
+                               tmpNumRecordsOut = new SimpleCounter();
+                       }
+                       numRecordsOut = tmpNumRecordsOut;
                }
+               
+               Counter completedSplitsCounter = 
ctx.getMetricGroup().counter("numSplitsProcessed");
 
                if 
(RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
                        ((RichInputFormat) this.format).setRuntimeContext(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index cb94d25..c977503 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -45,7 +45,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -344,7 +344,7 @@ public class IndividualRestartsConcurrencyTest extends 
TestLogger {
                                1,
                                allVertices,
                                checkpointCoordinatorConfiguration,
-                               new UnregisteredTaskMetricsGroup()));
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()));
 
                final CheckpointCoordinator checkpointCoordinator = 
graph.getCheckpointCoordinator();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 64f82f3..6f98119 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
 
@@ -63,11 +63,11 @@ public class InputGateConcurrentTest {
                                new IntermediateDataSetID(), 
ResultPartitionType.PIPELINED,
                                0, numChannels,
                                mock(TaskActions.class),
-                               new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                for (int i = 0; i < numChannels; i++) {
                        LocalInputChannel channel = new LocalInputChannel(gate, 
i, new ResultPartitionID(),
-                                       resultPartitionManager, 
mock(TaskEventDispatcher.class), new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                                       resultPartitionManager, 
mock(TaskEventDispatcher.class), 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
                        gate.setInputChannel(new 
IntermediateResultPartitionID(), channel);
 
                        partitions[i] = new PipelinedSubpartition(0, 
resultPartition);
@@ -99,12 +99,12 @@ public class InputGateConcurrentTest {
                                0,
                                numChannels,
                                mock(TaskActions.class),
-                               new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                for (int i = 0; i < numChannels; i++) {
                        RemoteInputChannel channel = new RemoteInputChannel(
                                        gate, i, new ResultPartitionID(), 
mock(ConnectionID.class),
-                                       connManager, 0, 0, new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                                       connManager, 0, 0, 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
                        gate.setInputChannel(new 
IntermediateResultPartitionID(), channel);
 
                        sources[i] = new RemoteChannelSource(channel);
@@ -148,7 +148,7 @@ public class InputGateConcurrentTest {
                                0,
                                numChannels,
                                mock(TaskActions.class),
-                               new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                for (int i = 0, local = 0; i < numChannels; i++) {
                        if (localOrRemote.get(i)) {
@@ -158,14 +158,14 @@ public class InputGateConcurrentTest {
                                sources[i] = new 
PipelinedSubpartitionSource(psp);
 
                                LocalInputChannel channel = new 
LocalInputChannel(gate, i, new ResultPartitionID(),
-                                               resultPartitionManager, 
mock(TaskEventDispatcher.class), new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                                               resultPartitionManager, 
mock(TaskEventDispatcher.class), 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
                                gate.setInputChannel(new 
IntermediateResultPartitionID(), channel);
                        }
                        else {
                                //remote channel
                                RemoteInputChannel channel = new 
RemoteInputChannel(
                                                gate, i, new 
ResultPartitionID(), mock(ConnectionID.class),
-                                               connManager, 0, 0, new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                                               connManager, 0, 0, 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
                                gate.setInputChannel(new 
IntermediateResultPartitionID(), channel);
 
                                sources[i] = new RemoteChannelSource(channel);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index f933840..324a060 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
 
@@ -89,11 +89,11 @@ public class InputGateFairnessTest {
                                new IntermediateDataSetID(),
                                0, numChannels,
                                mock(TaskActions.class),
-                               new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                for (int i = 0; i < numChannels; i++) {
                        LocalInputChannel channel = new LocalInputChannel(gate, 
i, new ResultPartitionID(),
-                                       resultPartitionManager, 
mock(TaskEventDispatcher.class), new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                                       resultPartitionManager, 
mock(TaskEventDispatcher.class), 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
                        gate.setInputChannel(new 
IntermediateResultPartitionID(), channel);
                }
 
@@ -142,11 +142,11 @@ public class InputGateFairnessTest {
                                new IntermediateDataSetID(),
                                0, numChannels,
                                mock(TaskActions.class),
-                               new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                for (int i = 0; i < numChannels; i++) {
                        LocalInputChannel channel = new LocalInputChannel(gate, 
i, new ResultPartitionID(),
-                                       resultPartitionManager, 
mock(TaskEventDispatcher.class), new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                                       resultPartitionManager, 
mock(TaskEventDispatcher.class), 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
                        gate.setInputChannel(new 
IntermediateResultPartitionID(), channel);
                }
 
@@ -192,7 +192,7 @@ public class InputGateFairnessTest {
                                new IntermediateDataSetID(),
                                0, numChannels,
                                mock(TaskActions.class),
-                               new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                final ConnectionManager connManager = 
createDummyConnectionManager();
 
@@ -201,7 +201,7 @@ public class InputGateFairnessTest {
                for (int i = 0; i < numChannels; i++) {
                        RemoteInputChannel channel = new RemoteInputChannel(
                                        gate, i, new ResultPartitionID(), 
mock(ConnectionID.class), 
-                                       connManager, 0, 0, new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                                       connManager, 0, 0, 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                        channels[i] = channel;
                        
@@ -247,7 +247,7 @@ public class InputGateFairnessTest {
                                new IntermediateDataSetID(),
                                0, numChannels,
                                mock(TaskActions.class),
-                               new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                final ConnectionManager connManager = 
createDummyConnectionManager();
 
@@ -257,7 +257,7 @@ public class InputGateFairnessTest {
                for (int i = 0; i < numChannels; i++) {
                        RemoteInputChannel channel = new RemoteInputChannel(
                                        gate, i, new ResultPartitionID(), 
mock(ConnectionID.class),
-                                       connManager, 0, 0, new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                                       connManager, 0, 0, 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                        channels[i] = channel;
                        gate.setInputChannel(new 
IntermediateResultPartitionID(), channel);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 5f7fd82..16cd90d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -38,7 +38,7 @@ import 
org.apache.flink.runtime.io.network.util.TestPartitionProducer;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -291,7 +291,7 @@ public class LocalInputChannelTest {
                        0,
                        1,
                        mock(TaskActions.class),
-                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()
                );
 
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
@@ -318,7 +318,7 @@ public class LocalInputChannelTest {
                        partitionManager,
                        new TaskEventDispatcher(),
                        1, 1,
-                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                gate.setInputChannel(new IntermediateResultPartitionID(), 
channel);
 
@@ -370,7 +370,7 @@ public class LocalInputChannelTest {
                        new ResultPartitionID(),
                        partitionManager,
                        new TaskEventDispatcher(),
-                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                channel.requestSubpartition(0);
 
@@ -411,7 +411,7 @@ public class LocalInputChannelTest {
                                mock(TaskEventDispatcher.class),
                                initialAndMaxRequestBackoff._1(),
                                initialAndMaxRequestBackoff._2(),
-                               new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
        }
 
        /**
@@ -487,7 +487,7 @@ public class LocalInputChannelTest {
                                        subpartitionIndex,
                                        numberOfInputChannels,
                                        mock(TaskActions.class),
-                                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                        // Set buffer pool
                        inputGate.setBufferPool(bufferPool);
@@ -502,7 +502,7 @@ public class LocalInputChannelTest {
                                                                
consumedPartitionIds[i],
                                                                
partitionManager,
                                                                
taskEventDispatcher,
-                                                               new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()));
+                                                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()));
                        }
 
                        this.numberOfInputChannels = numberOfInputChannels;

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index bced9ce..d791ced 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -270,7 +270,7 @@ public class RemoteInputChannelTest {
                                partitionId,
                                mock(ConnectionID.class),
                                connectionManager,
-                               new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                ch.onFailedPartitionRequest();
 
@@ -290,7 +290,7 @@ public class RemoteInputChannelTest {
                                new ResultPartitionID(),
                                mock(ConnectionID.class),
                                connManager,
-                               new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                ch.onError(new ProducerFailedException(new 
RuntimeException("Expected test exception.")));
 
@@ -401,6 +401,6 @@ public class RemoteInputChannelTest {
                        connectionManager,
                        initialAndMaxRequestBackoff._1(),
                        initialAndMaxRequestBackoff._2(),
-                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 4d7d884..da649cd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -43,7 +43,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
 
@@ -80,7 +80,7 @@ public class SingleInputGateTest {
                        new IntermediateDataSetID(), 
ResultPartitionType.PIPELINED,
                        0, 2,
                        mock(TaskActions.class),
-                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                assertEquals(ResultPartitionType.PIPELINED, 
inputGate.getConsumedPartitionType());
 
@@ -140,7 +140,7 @@ public class SingleInputGateTest {
                                resultId, ResultPartitionType.PIPELINED,
                                0, 2,
                                mock(TaskActions.class),
-                               new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
                final BufferPool bufferPool = mock(BufferPool.class);
                
when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -149,12 +149,12 @@ public class SingleInputGateTest {
                // Local
                ResultPartitionID localPartitionId = new ResultPartitionID(new 
IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-               InputChannel local = new LocalInputChannel(inputGate, 0, 
localPartitionId, partitionManager, taskEventDispatcher, new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+               InputChannel local = new LocalInputChannel(inputGate, 0, 
localPartitionId, partitionManager, taskEventDispatcher, 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                // Unknown
                ResultPartitionID unknownPartitionId = new 
ResultPartitionID(new IntermediateResultPartitionID(), new 
ExecutionAttemptID());
 
-               InputChannel unknown = new UnknownInputChannel(inputGate, 1, 
unknownPartitionId, partitionManager, taskEventDispatcher, 
mock(ConnectionManager.class), 0, 0, new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+               InputChannel unknown = new UnknownInputChannel(inputGate, 1, 
unknownPartitionId, partitionManager, taskEventDispatcher, 
mock(ConnectionManager.class), 0, 0, 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                // Set channels
                inputGate.setInputChannel(localPartitionId.getPartitionId(), 
local);
@@ -195,7 +195,7 @@ public class SingleInputGateTest {
                        ResultPartitionType.PIPELINED,
                        0,
                        1,
-                       mock(TaskActions.class), new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       mock(TaskActions.class), 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
 
@@ -206,7 +206,7 @@ public class SingleInputGateTest {
                        partitionManager,
                        new TaskEventDispatcher(),
                        new LocalConnectionManager(),
-                       0, 0, new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       0, 0, 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                inputGate.setInputChannel(unknown.partitionId.getPartitionId(), 
unknown);
 
@@ -236,7 +236,7 @@ public class SingleInputGateTest {
                        0,
                        1,
                        mock(TaskActions.class),
-                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                InputChannel unknown = new UnknownInputChannel(
                        inputGate,
@@ -246,7 +246,7 @@ public class SingleInputGateTest {
                        new TaskEventDispatcher(),
                        new LocalConnectionManager(),
                        0, 0,
-                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                inputGate.setInputChannel(unknown.partitionId.getPartitionId(), 
unknown);
 
@@ -339,7 +339,7 @@ public class SingleInputGateTest {
                        gateDesc,
                        netEnv,
                        mock(TaskActions.class),
-                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                assertEquals(gateDesc.getConsumedPartitionType(), 
gate.getConsumedPartitionType());
 
@@ -388,7 +388,7 @@ public class SingleInputGateTest {
                        0,
                        1,
                        mock(TaskActions.class),
-                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                RemoteInputChannel remote = mock(RemoteInputChannel.class);
                inputGate.setInputChannel(new IntermediateResultPartitionID(), 
remote);
@@ -416,7 +416,7 @@ public class SingleInputGateTest {
                        0,
                        1,
                        mock(TaskActions.class),
-                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                UnknownInputChannel unknown = mock(UnknownInputChannel.class);
                final ResultPartitionID resultPartitionId = new 
ResultPartitionID();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 18ad490..0ae6e74 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -60,7 +60,7 @@ public class TestSingleInputGate {
                        0,
                        numberOfInputChannels,
                        mock(TaskActions.class),
-                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                this.inputGate = spy(realGate);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index bc1dd07..9884855 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -21,7 +21,7 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
 
@@ -48,13 +48,13 @@ public class UnionInputGateTest {
                        new IntermediateDataSetID(), 
ResultPartitionType.PIPELINED,
                        0, 3,
                        mock(TaskActions.class),
-                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
                final SingleInputGate ig2 = new SingleInputGate(
                        testTaskName, new JobID(),
                        new IntermediateDataSetID(), 
ResultPartitionType.PIPELINED,
                        0, 5,
                        mock(TaskActions.class),
-                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                final UnionInputGate union = new UnionInputGate(new 
SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
index f5d6802..0b7547d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
@@ -203,7 +203,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends 
TestLogger {
                                ResourceID.generate(),
                                taskManagerSystem,
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                "localhost",
                                Option.<String>empty(),
                                false,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 88141d6..f86e7e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -64,6 +64,7 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -206,7 +207,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                mySubmittedJobGraphStore,
                                checkpointStateFactory,
                                jobRecoveryTimeout,
-                               new JobManagerMetricGroup(new 
NoOpMetricRegistry(), "localhost"),
+                               
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                                Option.<String>empty());
 
                        jobManager = system.actorOf(jobManagerProps);
@@ -217,7 +218,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                ResourceID.generate(),
                                system,
                                testingHighAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                "localhost",
                                Option.apply("taskmanager"),
                                true,
@@ -381,7 +382,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                submittedJobGraphStore,
                                mock(CheckpointRecoveryFactory.class),
                                jobRecoveryTimeout,
-                               new JobManagerMetricGroup(new 
NoOpMetricRegistry(), "localhost"),
+                               
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                                
recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
 
                        jobManager = system.actorOf(jobManagerProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 6a02d1f..51cc469 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -626,7 +626,7 @@ public class JobManagerTest extends TestLogger {
                        TestingUtils.defaultExecutor(),
                        TestingUtils.defaultExecutor(),
                        highAvailabilityServices,
-                       new NoOpMetricRegistry(),
+                       NoOpMetricRegistry.INSTANCE,
                        Option.empty(),
                        TestingJobManager.class,
                        MemoryArchivist.class)._1();
@@ -648,7 +648,7 @@ public class JobManagerTest extends TestLogger {
                        ResourceID.generate(),
                        system,
                        highAvailabilityServices,
-                       new NoOpMetricRegistry(),
+                       NoOpMetricRegistry.INSTANCE,
                        "localhost",
                        scala.Option.<String>empty(),
                        true,
@@ -845,7 +845,7 @@ public class JobManagerTest extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                Option.empty(),
                                Option.apply("jm"),
                                Option.apply("arch"),
@@ -864,7 +864,7 @@ public class JobManagerTest extends TestLogger {
                                ResourceID.generate(),
                                actorSystem,
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                "localhost",
                                Option.apply("tm"),
                                true,
@@ -1057,7 +1057,7 @@ public class JobManagerTest extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                Option.empty(),
                                Option.apply("jm"),
                                Option.apply("arch"),
@@ -1076,7 +1076,7 @@ public class JobManagerTest extends TestLogger {
                                ResourceID.generate(),
                                actorSystem,
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                "localhost",
                                Option.apply("tm"),
                                true,
@@ -1172,7 +1172,7 @@ public class JobManagerTest extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                Option.empty(),
                                Option.apply("jm"),
                                Option.apply("arch"),
@@ -1191,7 +1191,7 @@ public class JobManagerTest extends TestLogger {
                                ResourceID.generate(),
                                actorSystem,
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                "localhost",
                                Option.apply("tm"),
                                true,
@@ -1285,7 +1285,7 @@ public class JobManagerTest extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                Option.empty(),
                                Option.apply("jm"),
                                Option.apply("arch"),
@@ -1307,7 +1307,7 @@ public class JobManagerTest extends TestLogger {
                                ResourceID.generate(),
                                actorSystem,
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                "localhost",
                                Option.apply("tm"),
                                true,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index cc93f18..0ca83ae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -95,7 +95,7 @@ public class JobSubmitTest {
                        TestingUtils.defaultExecutor(),
                        TestingUtils.defaultExecutor(),
                        highAvailabilityServices,
-                       new NoOpMetricRegistry(),
+                       NoOpMetricRegistry.INSTANCE,
                        Option.empty(),
                        JobManager.class,
                        MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 72c03af..703cd0b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -201,7 +202,7 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                        submittedJobGraphStore,
                        checkpointRecoveryFactory,
                        AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
-                       new JobManagerMetricGroup(new NoOpMetricRegistry(), 
"localhost"),
+                       
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                        Option.<String>empty());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
deleted file mode 100644
index 46d6548..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.metrics;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.runtime.metrics.scope.ScopeFormats;
-
-import javax.annotation.Nullable;
-
-/**
- * Metric registry which does nothing and is intended for testing purposes.
- */
-public class NoOpMetricRegistry implements MetricRegistry {
-
-       final char delimiter = ',';
-
-       final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new 
Configuration());
-
-       @Override
-       public char getDelimiter() {
-               return delimiter;
-       }
-
-       @Override
-       public char getDelimiter(int index) {
-               return delimiter;
-       }
-
-       @Override
-       public int getNumberReporters() {
-               return 0;
-       }
-
-       @Override
-       public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {}
-
-       @Override
-       public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {}
-
-       @Override
-       public ScopeFormats getScopeFormats() {
-               return scopeFormats;
-       }
-
-       @Nullable
-       @Override
-       public String getMetricQueryServicePath() {
-               return null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index d934ea9..eec7165 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -78,7 +78,7 @@ public class TaskManagerMetricsTest extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                Option.empty(),
                                JobManager.class,
                                MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 0fced33..4dc5edf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -86,7 +86,7 @@ public class MetricGroupTest extends TestLogger {
         */
        @Test
        public void testUserDefinedVariable() {
-               MetricRegistry registry = new NoOpMetricRegistry();
+               MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
                GenericMetricGroup root = new GenericMetricGroup(registry, new 
DummyAbstractMetricGroup(registry), "root");
 
                String key = "key";
@@ -111,7 +111,7 @@ public class MetricGroupTest extends TestLogger {
         */
        @Test
        public void testUserDefinedVariableOnKeyGroup() {
-               MetricRegistry registry = new NoOpMetricRegistry();
+               MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
                GenericMetricGroup root = new GenericMetricGroup(registry, new 
DummyAbstractMetricGroup(registry), "root");
 
                String key1 = "key1";
@@ -142,7 +142,7 @@ public class MetricGroupTest extends TestLogger {
         */
        @Test
        public void testNameCollisionForKeyAfterGenericGroup() {
-               MetricRegistry registry = new NoOpMetricRegistry();
+               MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
                GenericMetricGroup root = new GenericMetricGroup(registry, new 
DummyAbstractMetricGroup(registry), "root");
 
                String key = "key";
@@ -169,7 +169,7 @@ public class MetricGroupTest extends TestLogger {
         */
        @Test
        public void testNameCollisionForKeyAndValueAfterGenericGroup() {
-               MetricRegistry registry = new NoOpMetricRegistry();
+               MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
                GenericMetricGroup root = new GenericMetricGroup(registry, new 
DummyAbstractMetricGroup(registry), "root");
 
                String key = "key";
@@ -195,7 +195,7 @@ public class MetricGroupTest extends TestLogger {
         */
        @Test
        public void testNameCollisionAfterKeyValueGroup() {
-               MetricRegistry registry = new NoOpMetricRegistry();
+               MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
                GenericMetricGroup root = new GenericMetricGroup(registry, new 
DummyAbstractMetricGroup(registry), "root");
 
                String key = "key";

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index bcf77de..f23b2f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 
 import org.junit.Test;
 
@@ -34,7 +33,7 @@ import static org.junit.Assert.assertNotNull;
 public class TaskIOMetricGroupTest {
        @Test
        public void testTaskIOMetricGroup() {
-               TaskMetricGroup task = new UnregisteredTaskMetricsGroup();
+               TaskMetricGroup task = 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
                TaskIOMetricGroup taskIO = task.getIOMetricGroup();
 
                // test counter forwarding

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 4fa74b3..a4d14c4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -29,10 +29,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
@@ -227,6 +227,6 @@ public class TestTaskContext<S, T> implements 
TaskContext<S, T> {
 
        @Override
        public OperatorMetricGroup getMetricGroup() {
-               return new 
UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
+               return 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index 96c8b73..a76f110 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -26,13 +26,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.TaskContext;
@@ -373,7 +372,7 @@ public abstract class BinaryOperatorTestBase<S extends 
Function, IN, OUT> extend
        
        @Override
        public OperatorMetricGroup getMetricGroup() {
-               return new 
UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
+               return 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index d2cedb9..3820bf9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.TaskContext;
@@ -368,7 +369,7 @@ public abstract class DriverTestBase<S extends Function> 
extends TestLogger impl
        
        @Override
        public OperatorMetricGroup getMetricGroup() {
-               return new 
UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
+               return 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 718ecfe..148eb0b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -100,7 +101,7 @@ public class DummyEnvironment implements Environment {
 
        @Override
        public TaskMetricGroup getMetricGroup() {
-               return new UnregisteredTaskMetricsGroup();
+               return 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index f655b12..861cf35 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -281,7 +282,7 @@ public class MockEnvironment implements Environment {
 
        @Override
        public TaskMetricGroup getMetricGroup() {
-               return new UnregisteredTaskMetricsGroup();
+               return 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 141aec6..2ef82da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.ResettableDriver;
@@ -364,7 +365,7 @@ public abstract class UnaryOperatorTestBase<S extends 
Function, IN, OUT> extends
        
        @Override
        public OperatorMetricGroup getMetricGroup() {
-               return new 
UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
+               return 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
deleted file mode 100644
index 7065e6b..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-
-import java.util.UUID;
-
-public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
-       
-       private static final MetricRegistry EMPTY_REGISTRY = new 
NoOpMetricRegistry();
-
-       
-       public UnregisteredTaskMetricsGroup() {
-               super(EMPTY_REGISTRY, new DummyJobMetricGroup(),
-                               new JobVertexID(), new ExecutionAttemptID(), 
"testtask", 0, 0);
-       }
-
-       @Override
-       protected void addMetric(String name, Metric metric) {}
-
-       @Override
-       public MetricGroup addGroup(String name) {
-               return new UnregisteredMetricsGroup();
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       private static class DummyTaskManagerMetricsGroup extends 
TaskManagerMetricGroup {
-
-               public DummyTaskManagerMetricsGroup() {
-                       super(EMPTY_REGISTRY, "localhost", 
UUID.randomUUID().toString());
-               }
-       }
-
-       private static class DummyJobMetricGroup extends 
TaskManagerJobMetricGroup {
-               
-               public DummyJobMetricGroup() {
-                       super(EMPTY_REGISTRY, new 
DummyTaskManagerMetricsGroup(), new JobID(), "testjob");
-               }
-       }
-       
-       public static class DummyTaskIOMetricGroup extends TaskIOMetricGroup {
-               public DummyTaskIOMetricGroup() {
-                       super(new UnregisteredTaskMetricsGroup());
-               }
-       }
-
-       public static class DummyOperatorMetricGroup extends 
OperatorMetricGroup {
-               public DummyOperatorMetricGroup() {
-                       super(EMPTY_REGISTRY, new 
UnregisteredTaskMetricsGroup(), new OperatorID(), "testoperator");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 3050718..87cb4a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -97,7 +97,7 @@ public class ResourceManagerTest extends TestLogger {
                        highAvailabilityServices,
                        new HeartbeatServices(1000L, 10000L),
                        slotManager,
-                       new NoOpMetricRegistry(),
+                       NoOpMetricRegistry.INSTANCE,
                        jobLeaderIdService,
                        testingFatalErrorHandler);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 98b5b8b..a3c41c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -98,7 +98,7 @@ public class TaskManagerComponentsStartupShutdownTest extends 
TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                Option.empty(),
                                JobManager.class,
                                MemoryArchivist.class)._1();
@@ -168,7 +168,7 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
                                network,
                                numberOfSlots,
                                highAvailabilityServices,
-                               new TaskManagerMetricGroup(new 
NoOpMetricRegistry(), connectionInfo.getHostname(), 
connectionInfo.getResourceID().getResourceIdString()));
+                               new 
TaskManagerMetricGroup(NoOpMetricRegistry.INSTANCE, 
connectionInfo.getHostname(), 
connectionInfo.getResourceID().getResourceIdString()));
 
                        taskManager = actorSystem.actorOf(tmProps);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 7429ec5..fadebce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -122,7 +122,7 @@ public abstract class TaskManagerProcessReapingTestBase 
extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                Option.empty(),
                                JobManager.class,
                                MemoryArchivist.class)._1;

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index ed06dc0..4c7a8cf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -250,7 +250,7 @@ public class TaskManagerStartupTest extends TestLogger {
                                ResourceID.generate(),
                                null,
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                "localhost",
                                Option.<String>empty(),
                                false,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 38238cd..086ad71 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
@@ -193,7 +193,7 @@ public class JvmExitOnFatalErrorTest {
                                                        new String[0]),
                                                new 
FileCache(tmInfo.getTmpDirectories()),
                                                tmInfo,
-                                               new 
UnregisteredTaskMetricsGroup(),
+                                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
                                                new 
NoOpResultPartitionConsumableNotifier(),
                                                new 
NoOpPartitionProducerStateChecker(),
                                                executor);

Reply via email to