http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 58022a9..4062749 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -33,12 +33,12 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.DefaultKeyedStateStore; @@ -155,7 +155,7 @@ public abstract class AbstractStreamOperator<OUT> // --------------- Metrics --------------------------- /** Metric group for the operator. */ - protected transient MetricGroup metrics; + protected transient OperatorMetricGroup metrics; protected transient LatencyGauge latencyGauge; @@ -191,7 +191,7 @@ public abstract class AbstractStreamOperator<OUT> this.metrics = operatorMetricGroup; } catch (Exception e) { LOG.warn("An error occurred while instantiating task metrics.", e); - this.metrics = new UnregisteredMetricsGroup(); + this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); this.output = output; } Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 609f8b8..0c71a53 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -49,6 +50,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -70,6 +74,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class StreamInputProcessor<IN> { + private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class); + private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers; private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer; @@ -169,7 +175,12 @@ public class StreamInputProcessor<IN> { return false; } if (numRecordsIn == null) { - numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + try { + numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + numRecordsIn = new SimpleCounter(); + } } while (true) { http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 7874147..824acad 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -49,6 +50,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.Collection; @@ -72,6 +76,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class StreamTwoInputProcessor<IN1, IN2> { + private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class); + private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers; private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer; @@ -201,7 +207,12 @@ public class StreamTwoInputProcessor<IN1, IN2> { return false; } if (numRecordsIn == null) { - numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + try { + numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + numRecordsIn = new SimpleCounter(); + } } while (true) { http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index a44cffb..141a623 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -21,11 +21,13 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput; @@ -426,7 +428,21 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea StreamStatusProvider streamStatusProvider, OutputTag<T> outputTag) { this.operator = operator; - this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + + { + Counter tmpNumRecordsIn; + try { + OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup(); + ioMetricGroup.reuseInputMetricsForTask(); + ioMetricGroup.reuseOutputMetricsForTask(); + tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + tmpNumRecordsIn = new SimpleCounter(); + } + numRecordsIn = tmpNumRecordsIn; + } + this.streamStatusProvider = streamStatusProvider; this.outputTag = outputTag; } http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java index d6f5e61..2618e53 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java @@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.junit.Test; @@ -101,7 +102,7 @@ public class RichAsyncFunctionTest { }; final String taskName = "foobarTask"; - final MetricGroup metricGroup = mock(MetricGroup.class); + final MetricGroup metricGroup = new UnregisteredMetricsGroup(); final int numberOfParallelSubtasks = 42; final int indexOfSubtask = 43; final int attemptNumber = 1337; http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 993bffb..a556b18 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -35,8 +35,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.datastream.AsyncDataStream; @@ -658,7 +658,7 @@ public class AsyncWaitOperatorTest extends TestLogger { final Configuration taskConfiguration = new Configuration(); final ExecutionConfig executionConfig = new ExecutionConfig(); - final TaskMetricGroup metricGroup = new UnregisteredTaskMetricsGroup(); + final TaskMetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo(); final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 1); http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index eacded6..0af1471 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -272,7 +272,7 @@ public class InterruptSensitiveRestoreTest { new String[0]), new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }), new TestingTaskManagerRuntimeInfo(), - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), mock(ResultPartitionConsumableNotifier.class), mock(PartitionProducerStateChecker.class), mock(Executor.class)); http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 277ca51..ee7337c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -47,8 +47,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; import org.apache.flink.runtime.query.KvStateRegistry; @@ -363,6 +363,6 @@ public class StreamMockEnvironment implements Environment { @Override public TaskMetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup(); + return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 5480ce7..e3e51aa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -168,7 +168,7 @@ public class StreamTaskTerminationTest extends TestLogger { new String[0]), mock(FileCache.class), taskManagerRuntimeInfo, - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), mock(ResultPartitionConsumableNotifier.class), mock(PartitionProducerStateChecker.class), Executors.directExecutor()); http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index d0ea714..8ce8b03 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -58,9 +58,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -935,7 +935,7 @@ public class StreamTaskTest extends TestLogger { libCache, mock(FileCache.class), new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}), - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), consumableNotifier, partitionProducerStateChecker, executor); http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java index d61b95d..b1127d5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java @@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -244,7 +244,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { new String[0]), new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }), new TestingTaskManagerRuntimeInfo(), - new UnregisteredTaskMetricsGroup(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), mock(ResultPartitionConsumableNotifier.class), mock(PartitionProducerStateChecker.class), Executors.directExecutor()); http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 94aed2a..29516dc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -146,7 +146,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index 7c53d52..cefadb4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -218,7 +218,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { ResourceID.generate(), taskManagerSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.<String>empty(), false, http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index 8e97e9d..357f7af 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -282,7 +282,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { ResourceID.generate(), tmActorSystem[i], highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.<String>empty(), false, http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index ecd0bea..13d6804 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -121,7 +121,7 @@ public class ProcessFailureCancelingITCase extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index c86f21f..9710c20 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -122,7 +122,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -144,7 +144,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, - new NoOpMetricRegistry(), + NoOpMetricRegistry.INSTANCE, "localhost", Option.apply("tm"), true, http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 031a4cb..39ddfa7 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -257,7 +257,7 @@ public class YarnResourceManagerTest extends TestLogger { rmLeaderElectionService = new TestingLeaderElectionService(); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor); - metricRegistry = new NoOpMetricRegistry(); + metricRegistry = NoOpMetricRegistry.INSTANCE; slotManager = new SlotManager( new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()), Time.seconds(10), Time.seconds(10), Time.minutes(1));
