http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 58022a9..4062749 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -33,12 +33,12 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DefaultKeyedStateStore;
@@ -155,7 +155,7 @@ public abstract class AbstractStreamOperator<OUT>
        // --------------- Metrics ---------------------------
 
        /** Metric group for the operator. */
-       protected transient MetricGroup metrics;
+       protected transient OperatorMetricGroup metrics;
 
        protected transient LatencyGauge latencyGauge;
 
@@ -191,7 +191,7 @@ public abstract class AbstractStreamOperator<OUT>
                        this.metrics = operatorMetricGroup;
                } catch (Exception e) {
                        LOG.warn("An error occurred while instantiating task 
metrics.", e);
-                       this.metrics = new UnregisteredMetricsGroup();
+                       this.metrics = 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
                        this.output = output;
                }
                Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 609f8b8..0c71a53 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -49,6 +50,9 @@ import 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -70,6 +74,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class StreamInputProcessor<IN> {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamInputProcessor.class);
+
        private final 
RecordDeserializer<DeserializationDelegate<StreamElement>>[] 
recordDeserializers;
 
        private RecordDeserializer<DeserializationDelegate<StreamElement>> 
currentRecordDeserializer;
@@ -169,7 +175,12 @@ public class StreamInputProcessor<IN> {
                        return false;
                }
                if (numRecordsIn == null) {
-                       numRecordsIn = ((OperatorMetricGroup) 
streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+                       try {
+                               numRecordsIn = ((OperatorMetricGroup) 
streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+                       } catch (Exception e) {
+                               LOG.warn("An exception occurred during the 
metrics setup.", e);
+                               numRecordsIn = new SimpleCounter();
+                       }
                }
 
                while (true) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 7874147..824acad 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -49,6 +50,9 @@ import 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.Collection;
 
@@ -72,6 +76,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class StreamTwoInputProcessor<IN1, IN2> {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamTwoInputProcessor.class);
+
        private final 
RecordDeserializer<DeserializationDelegate<StreamElement>>[] 
recordDeserializers;
 
        private RecordDeserializer<DeserializationDelegate<StreamElement>> 
currentRecordDeserializer;
@@ -201,7 +207,12 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                        return false;
                }
                if (numRecordsIn == null) {
-                       numRecordsIn = ((OperatorMetricGroup) 
streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+                       try {
+                               numRecordsIn = ((OperatorMetricGroup) 
streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+                       } catch (Exception e) {
+                               LOG.warn("An exception occurred during the 
metrics setup.", e);
+                               numRecordsIn = new SimpleCounter();
+                       }
                }
 
                while (true) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index a44cffb..141a623 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -21,11 +21,13 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
@@ -426,7 +428,21 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                                StreamStatusProvider streamStatusProvider,
                                OutputTag<T> outputTag) {
                        this.operator = operator;
-                       this.numRecordsIn = ((OperatorMetricGroup) 
operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+
+                       {
+                               Counter tmpNumRecordsIn;
+                               try {
+                                       OperatorIOMetricGroup ioMetricGroup = 
((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup();
+                                       
ioMetricGroup.reuseInputMetricsForTask();
+                                       
ioMetricGroup.reuseOutputMetricsForTask();
+                                       tmpNumRecordsIn = 
ioMetricGroup.getNumRecordsInCounter();
+                               } catch (Exception e) {
+                                       LOG.warn("An exception occurred during 
the metrics setup.", e);
+                                       tmpNumRecordsIn = new SimpleCounter();
+                               }
+                               numRecordsIn = tmpNumRecordsIn;
+                       }
+
                        this.streamStatusProvider = streamStatusProvider;
                        this.outputTag = outputTag;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index d6f5e61..2618e53 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 
 import org.junit.Test;
 
@@ -101,7 +102,7 @@ public class RichAsyncFunctionTest {
                };
 
                final String taskName = "foobarTask";
-               final MetricGroup metricGroup = mock(MetricGroup.class);
+               final MetricGroup metricGroup = new UnregisteredMetricsGroup();
                final int numberOfParallelSubtasks = 42;
                final int indexOfSubtask = 43;
                final int attemptNumber = 1337;

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 993bffb..a556b18 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -35,8 +35,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -658,7 +658,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
                final Configuration taskConfiguration = new Configuration();
                final ExecutionConfig executionConfig = new ExecutionConfig();
-               final TaskMetricGroup metricGroup = new 
UnregisteredTaskMetricsGroup();
+               final TaskMetricGroup metricGroup = 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
                final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new 
TestingTaskManagerRuntimeInfo();
                final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 
1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index eacded6..0af1471 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -272,7 +272,7 @@ public class InterruptSensitiveRestoreTest {
                                new String[0]),
                        new FileCache(new String[] { 
EnvironmentInformation.getTemporaryFileDirectory() }),
                        new TestingTaskManagerRuntimeInfo(),
-                       new UnregisteredTaskMetricsGroup(),
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
                        mock(ResultPartitionConsumableNotifier.class),
                        mock(PartitionProducerStateChecker.class),
                        mock(Executor.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 277ca51..ee7337c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -47,8 +47,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.query.KvStateRegistry;
@@ -363,6 +363,6 @@ public class StreamMockEnvironment implements Environment {
 
        @Override
        public TaskMetricGroup getMetricGroup() {
-               return new UnregisteredTaskMetricsGroup();
+               return 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 5480ce7..e3e51aa 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -168,7 +168,7 @@ public class StreamTaskTerminationTest extends TestLogger {
                                new String[0]),
                        mock(FileCache.class),
                        taskManagerRuntimeInfo,
-                       new UnregisteredTaskMetricsGroup(),
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
                        mock(ResultPartitionConsumableNotifier.class),
                        mock(PartitionProducerStateChecker.class),
                        Executors.directExecutor());

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d0ea714..8ce8b03 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -58,9 +58,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -935,7 +935,7 @@ public class StreamTaskTest extends TestLogger {
                        libCache,
                        mock(FileCache.class),
                        new TestingTaskManagerRuntimeInfo(taskManagerConfig, 
new String[] {System.getProperty("java.io.tmpdir")}),
-                       new UnregisteredTaskMetricsGroup(),
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
                        consumableNotifier,
                        partitionProducerStateChecker,
                        executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index d61b95d..b1127d5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -244,7 +244,7 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                                        new String[0]),
                                new FileCache(new String[] { 
EnvironmentInformation.getTemporaryFileDirectory() }),
                                new TestingTaskManagerRuntimeInfo(),
-                               new UnregisteredTaskMetricsGroup(),
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
                                mock(ResultPartitionConsumableNotifier.class),
                                mock(PartitionProducerStateChecker.class),
                                Executors.directExecutor());

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 94aed2a..29516dc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -146,7 +146,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                Option.empty(),
                                JobManager.class,
                                MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 7c53d52..cefadb4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -218,7 +218,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                                ResourceID.generate(),
                                taskManagerSystem,
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                "localhost",
                                Option.<String>empty(),
                                false,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 8e97e9d..357f7af 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -282,7 +282,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
                                        ResourceID.generate(),
                                        tmActorSystem[i],
                                        highAvailabilityServices,
-                                       new NoOpMetricRegistry(),
+                                       NoOpMetricRegistry.INSTANCE,
                                        "localhost",
                                        Option.<String>empty(),
                                        false,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index ecd0bea..13d6804 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -121,7 +121,7 @@ public class ProcessFailureCancelingITCase extends 
TestLogger {
                                TestingUtils.defaultExecutor(),
                                TestingUtils.defaultExecutor(),
                                highAvailabilityServices,
-                               new NoOpMetricRegistry(),
+                               NoOpMetricRegistry.INSTANCE,
                                Option.empty(),
                                JobManager.class,
                                MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index c86f21f..9710c20 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -122,7 +122,7 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger {
                        TestingUtils.defaultExecutor(),
                        TestingUtils.defaultExecutor(),
                        highAvailabilityServices,
-                       new NoOpMetricRegistry(),
+                       NoOpMetricRegistry.INSTANCE,
                        Option.empty(),
                        Option.apply("jm"),
                        Option.apply("arch"),
@@ -144,7 +144,7 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger {
                        ResourceID.generate(),
                        actorSystem,
                        highAvailabilityServices,
-                       new NoOpMetricRegistry(),
+                       NoOpMetricRegistry.INSTANCE,
                        "localhost",
                        Option.apply("tm"),
                        true,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 031a4cb..39ddfa7 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -257,7 +257,7 @@ public class YarnResourceManagerTest extends TestLogger {
                                rmLeaderElectionService = new 
TestingLeaderElectionService();
                                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
                                heartbeatServices = new 
TestingHeartbeatServices(5L, 5L, scheduledExecutor);
-                               metricRegistry = new NoOpMetricRegistry();
+                               metricRegistry = NoOpMetricRegistry.INSTANCE;
                                slotManager = new SlotManager(
                                                new 
ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()),
                                                Time.seconds(10), 
Time.seconds(10), Time.minutes(1));

Reply via email to