Repository: flink
Updated Branches:
  refs/heads/master e4fe89d6e -> e3fec1f9a


http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
new file mode 100644
index 0000000..339e1d1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat;
+import org.apache.flink.util.AbstractID;
+
+import org.apache.flink.util.SerializedValue;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+
+import static org.junit.Assert.*;
+
+public class TaskManagerGroupTest {
+
+       // 
------------------------------------------------------------------------
+       //  adding and removing jobs
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void addAndRemoveJobs() throws IOException {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
+                               registry, "localhost", new 
AbstractID().toString());
+               
+               
+               final JobID jid1 = new JobID();
+               final JobID jid2 = new JobID();
+               
+               final String jobName1 = "testjob";
+               final String jobName2 = "anotherJob";
+               
+               final JobVertexID vertex11 = new JobVertexID();
+               final JobVertexID vertex12 = new JobVertexID();
+               final JobVertexID vertex13 = new JobVertexID();
+               final JobVertexID vertex21 = new JobVertexID();
+
+               final ExecutionAttemptID execution11 = new ExecutionAttemptID();
+               final ExecutionAttemptID execution12 = new ExecutionAttemptID();
+               final ExecutionAttemptID execution13 = new ExecutionAttemptID();
+               final ExecutionAttemptID execution21 = new ExecutionAttemptID();
+
+               TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
+                       jid1, 
+                       jobName1, 
+                       vertex11, 
+                       execution11, 
+                       new SerializedValue<>(new ExecutionConfig()), 
+                       "test", 
+                       17, 18, 0, 
+                       new Configuration(), new Configuration(), 
+                       "", 
+                       new ArrayList<ResultPartitionDeploymentDescriptor>(), 
+                       new ArrayList<InputGateDeploymentDescriptor>(), 
+                       new ArrayList<BlobKey>(), 
+                       new ArrayList<URL>(), 0);
+
+               TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
+                       jid1,
+                       jobName1,
+                       vertex12,
+                       execution12,
+                       new SerializedValue<>(new ExecutionConfig()),
+                       "test",
+                       13, 18, 1,
+                       new Configuration(), new Configuration(),
+                       "",
+                       new ArrayList<ResultPartitionDeploymentDescriptor>(),
+                       new ArrayList<InputGateDeploymentDescriptor>(),
+                       new ArrayList<BlobKey>(),
+                       new ArrayList<URL>(), 0);
+
+               TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor(
+                       jid2,
+                       jobName2,
+                       vertex21,
+                       execution21,
+                       new SerializedValue<>(new ExecutionConfig()),
+                       "test",
+                       7, 18, 2,
+                       new Configuration(), new Configuration(),
+                       "",
+                       new ArrayList<ResultPartitionDeploymentDescriptor>(),
+                       new ArrayList<InputGateDeploymentDescriptor>(),
+                       new ArrayList<BlobKey>(),
+                       new ArrayList<URL>(), 0);
+
+               TaskDeploymentDescriptor tdd4 = new TaskDeploymentDescriptor(
+                       jid1,
+                       jobName1,
+                       vertex13,
+                       execution13,
+                       new SerializedValue<>(new ExecutionConfig()),
+                       "test",
+                       0, 18, 0,
+                       new Configuration(), new Configuration(),
+                       "",
+                       new ArrayList<ResultPartitionDeploymentDescriptor>(),
+                       new ArrayList<InputGateDeploymentDescriptor>(),
+                       new ArrayList<BlobKey>(),
+                       new ArrayList<URL>(), 0);
+               
+               TaskMetricGroup tmGroup11 = group.addTaskForJob(tdd1);
+               TaskMetricGroup tmGroup12 = group.addTaskForJob(tdd2);
+               TaskMetricGroup tmGroup21 = group.addTaskForJob(tdd3);
+               
+               assertEquals(2, group.numRegisteredJobMetricGroups());
+               assertFalse(tmGroup11.parent().isClosed());
+               assertFalse(tmGroup12.parent().isClosed());
+               assertFalse(tmGroup21.parent().isClosed());
+               
+               // close all for job 2 and one from job 1
+               tmGroup11.close();
+               tmGroup21.close();
+               assertTrue(tmGroup11.isClosed());
+               assertTrue(tmGroup21.isClosed());
+               
+               // job 2 should be removed, job should still be there
+               assertFalse(tmGroup11.parent().isClosed());
+               assertFalse(tmGroup12.parent().isClosed());
+               assertTrue(tmGroup21.parent().isClosed());
+               assertEquals(1, group.numRegisteredJobMetricGroups());
+               
+               // add one more to job one
+               TaskMetricGroup tmGroup13 = group.addTaskForJob(tdd4);
+               tmGroup12.close();
+               tmGroup13.close();
+
+               assertTrue(tmGroup11.parent().isClosed());
+               assertTrue(tmGroup12.parent().isClosed());
+               assertTrue(tmGroup13.parent().isClosed());
+               
+               assertEquals(0, group.numRegisteredJobMetricGroups());
+
+               registry.shutdown();
+       }
+
+       @Test
+       public void testCloseClosesAll() throws IOException {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
+                               registry, "localhost", new 
AbstractID().toString());
+
+
+               final JobID jid1 = new JobID();
+               final JobID jid2 = new JobID();
+
+               final String jobName1 = "testjob";
+               final String jobName2 = "anotherJob";
+
+               final JobVertexID vertex11 = new JobVertexID();
+               final JobVertexID vertex12 = new JobVertexID();
+               final JobVertexID vertex21 = new JobVertexID();
+
+               final ExecutionAttemptID execution11 = new ExecutionAttemptID();
+               final ExecutionAttemptID execution12 = new ExecutionAttemptID();
+               final ExecutionAttemptID execution21 = new ExecutionAttemptID();
+
+               TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
+                       jid1,
+                       jobName1,
+                       vertex11,
+                       execution11,
+                       new SerializedValue<>(new ExecutionConfig()),
+                       "test",
+                       17, 18, 0,
+                       new Configuration(), new Configuration(),
+                       "",
+                       new ArrayList<ResultPartitionDeploymentDescriptor>(),
+                       new ArrayList<InputGateDeploymentDescriptor>(),
+                       new ArrayList<BlobKey>(),
+                       new ArrayList<URL>(), 0);
+
+               TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
+                       jid1,
+                       jobName1,
+                       vertex12,
+                       execution12,
+                       new SerializedValue<>(new ExecutionConfig()),
+                       "test",
+                       13, 18, 1,
+                       new Configuration(), new Configuration(),
+                       "",
+                       new ArrayList<ResultPartitionDeploymentDescriptor>(),
+                       new ArrayList<InputGateDeploymentDescriptor>(),
+                       new ArrayList<BlobKey>(),
+                       new ArrayList<URL>(), 0);
+
+               TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor(
+                       jid2,
+                       jobName2,
+                       vertex21,
+                       execution21,
+                       new SerializedValue<>(new ExecutionConfig()),
+                       "test",
+                       7, 18, 1,
+                       new Configuration(), new Configuration(),
+                       "",
+                       new ArrayList<ResultPartitionDeploymentDescriptor>(),
+                       new ArrayList<InputGateDeploymentDescriptor>(),
+                       new ArrayList<BlobKey>(),
+                       new ArrayList<URL>(), 0);
+
+               TaskMetricGroup tmGroup11 = group.addTaskForJob(tdd1);
+               TaskMetricGroup tmGroup12 = group.addTaskForJob(tdd2);
+               TaskMetricGroup tmGroup21 = group.addTaskForJob(tdd3);
+               
+               group.close();
+               
+               assertTrue(tmGroup11.isClosed());
+               assertTrue(tmGroup12.isClosed());
+               assertTrue(tmGroup21.isClosed());
+
+               registry.shutdown();
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  scope name tests
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testGenerateScopeDefault() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, "localhost", "id");
+
+               assertArrayEquals(new String[] { "localhost", "taskmanager", 
"id" }, group.getScopeComponents());
+               assertEquals("localhost.taskmanager.id.name", 
group.getMetricIdentifier("name"));
+               registry.shutdown();
+       }
+
+       @Test
+       public void testGenerateScopeCustom() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               TaskManagerScopeFormat format = new 
TaskManagerScopeFormat("constant.<host>.foo.<host>");
+               TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, format, "host", "id");
+
+               assertArrayEquals(new String[] { "constant", "host", "foo", 
"host" }, group.getScopeComponents());
+               assertEquals("constant.host.foo.host.name", 
group.getMetricIdentifier("name"));
+               registry.shutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
new file mode 100644
index 0000000..36a3c54
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat;
+import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TaskManagerJobGroupTest {
+
+       @Test
+       public void testGenerateScopeDefault() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName"},
+                               jmGroup.getScopeComponents());
+
+               assertEquals(
+                               
"theHostName.taskmanager.test-tm-id.myJobName.name",
+                               jmGroup.getMetricIdentifier("name"));
+               registry.shutdown();
+       }
+
+       @Test
+       public void testGenerateScopeCustom() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("abc");
+               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
+
+               JobID jid = new JobID();
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "some-constant", "myJobName" },
+                               jmGroup.getScopeComponents());
+
+               assertEquals(
+                               "some-constant.myJobName.name",
+                               jmGroup.getMetricIdentifier("name"));
+               registry.shutdown();
+       }
+
+       @Test
+       public void testGenerateScopeCustomWildcard() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("peter.<tm_id>");
+               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
+
+               JobID jid = new JobID();
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "peter", "test-tm-id", 
"some-constant", jid.toString() },
+                               jmGroup.getScopeComponents());
+
+               assertEquals(
+                               "peter.test-tm-id.some-constant." + jid + 
".name",
+                               jmGroup.getMetricIdentifier("name"));
+               registry.shutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
new file mode 100644
index 0000000..d3ce0d1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat;
+import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat;
+import org.apache.flink.runtime.metrics.scope.TaskScopeFormat;
+import org.apache.flink.util.AbstractID;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TaskMetricGroupTest {
+
+       // 
------------------------------------------------------------------------
+       //  scope tests
+       // 
------------------------------------------------------------------------
+       private CountingMetricRegistry registry;
+
+       @Before
+       public void createRegistry() {
+               this.registry = new CountingMetricRegistry(new Configuration());
+       }
+
+       @After
+       public void shutdownRegistry() {
+               this.registry.shutdown();
+               this.registry = null;
+       }
+
+       @Test
+       public void testGenerateScopeDefault() {
+               AbstractID vertexId = new AbstractID();
+               AbstractID executionId = new AbstractID();
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
+               TaskMetricGroup taskGroup = new TaskMetricGroup(registry, 
jmGroup, vertexId, executionId, "aTaskName", 13, 2);
+
+               assertArrayEquals(
+                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName", "aTaskName", "13"},
+                               taskGroup.getScopeComponents());
+
+               assertEquals(
+                               
"theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13.name",
+                               taskGroup.getMetricIdentifier("name"));
+               registry.shutdown();
+       }
+
+       @Test
+       public void testGenerateScopeCustom() {
+               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("abc");
+               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("def", tmFormat);
+               TaskScopeFormat taskFormat = new 
TaskScopeFormat("<tm_id>.<job_id>.<task_id>.<task_attempt_id>", jmFormat);
+
+               JobID jid = new JobID();
+               AbstractID vertexId = new AbstractID();
+               AbstractID executionId = new AbstractID();
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
+               TaskMetricGroup taskGroup = new TaskMetricGroup(
+                               registry, jmGroup, taskFormat, vertexId, 
executionId, "aTaskName", 13, 2);
+
+               assertArrayEquals(
+                               new String[] { "test-tm-id", jid.toString(), 
vertexId.toString(), executionId.toString() },
+                               taskGroup.getScopeComponents());
+
+               assertEquals(
+                               String.format("test-tm-id.%s.%s.%s.name", jid, 
vertexId, executionId),
+                               taskGroup.getMetricIdentifier("name"));
+               registry.shutdown();
+       }
+
+       @Test
+       public void testGenerateScopeWilcard() {
+               TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat(
+                               ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
+               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat(
+                               
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, tmFormat);
+
+               TaskScopeFormat format = new 
TaskScopeFormat("*.<task_attempt_id>.<subtask_index>", jmFormat);
+
+               AbstractID executionId = new AbstractID();
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
+
+               TaskMetricGroup taskGroup = new TaskMetricGroup(
+                               registry, jmGroup, format, new AbstractID(), 
executionId, "aTaskName", 13, 1);
+
+               assertArrayEquals(
+                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName", executionId.toString(), "13" },
+                               taskGroup.getScopeComponents());
+
+               assertEquals(
+                               "theHostName.taskmanager.test-tm-id.myJobName." 
+ executionId + ".13.name",
+                               taskGroup.getMetricIdentifier("name"));
+               registry.shutdown();
+       }
+
+       @Test
+       public void testTaskMetricGroupCleanup() {
+               TaskManagerMetricGroup taskManagerMetricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "0");
+               TaskManagerJobMetricGroup taskManagerJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job");
+               TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, 
taskManagerJobMetricGroup, new AbstractID(), new AbstractID(), "task", 0, 0);
+
+               // the io metric should have registered predefined metrics
+               assertTrue(registry.getNumberRegisteredMetrics() > 0);
+
+               taskMetricGroup.close();
+
+               // now alle registered metrics should have been unregistered
+               assertEquals(0, registry.getNumberRegisteredMetrics());
+       }
+
+       private static class CountingMetricRegistry extends MetricRegistry {
+
+               private int counter = 0;
+
+               CountingMetricRegistry(Configuration config) {
+                       super(config);
+               }
+
+               @Override
+               public void register(Metric metric, String metricName, 
MetricGroup group) {
+                       super.register(metric, metricName, group);
+                       counter++;
+               }
+
+               @Override
+               public void unregister(Metric metric, String metricName, 
MetricGroup group) {
+                       super.unregister(metric, metricName, group);
+                       counter--;
+               }
+
+               int getNumberRegisteredMetrics() {
+                       return counter;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java
new file mode 100644
index 0000000..cd96c60
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+
+public class TestReporter extends AbstractReporter {
+
+       @Override
+       public void open(MetricConfig config) {}
+
+       @Override
+       public void close() {}
+
+       @Override
+       public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {}
+
+       @Override
+       public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {}
+
+       @Override
+       public String filterCharacters(String input) {
+               return input;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 063e295..5af34fb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 78e4cce..9dea324 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.operators.testutils;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
@@ -39,10 +39,10 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index 96922c6..ff0c480 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -22,12 +22,12 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.IOMetricGroup;
-import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
-import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 4b90b88..0c0d064 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 7660893..fea21be 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -17,11 +17,10 @@
  */
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 5394fd1..fec9ef3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
index 6f6b96e..af60f3d 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
@@ -129,6 +129,13 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-jmx</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka-base_2.10</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
index fe4db7c..c355638 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
@@ -103,6 +103,13 @@ under the License.
                        <version>${kafka.version}</version>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-jmx</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
                
                <dependency>
                        <groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
index 258a2cf..a36d222 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
@@ -126,6 +126,13 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-jmx</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-java_2.10</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index b8815da..4450e94 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
@@ -101,7 +102,7 @@ public abstract class KafkaTestBase extends TestLogger {
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
                
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
-               flinkConfig.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
"org.apache.flink.metrics.reporter.JMXReporter");
+               flinkConfig.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
JMXReporter.class.getName());
 
                flink = new ForkableFlinkMiniCluster(flinkConfig, false);
                flink.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 5578365..d11990e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,7 +24,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 1a66934..ce764b7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index a8dd49b..7084208 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.event.AbstractEvent;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 76f22be..e9d583c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -23,7 +23,7 @@ import akka.actor.ActorRef;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 5512ad7..7ca9c3e 100644
--- 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -24,13 +24,13 @@ import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
-import org.apache.flink.metrics.MetricRegistry
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
 
 import scala.concurrent.duration.FiniteDuration

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index c772a25..94ad9f2 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -24,7 +24,6 @@ import akka.actor.ActorRef
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration, 
ConfigConstants}
-import org.apache.flink.metrics.MetricRegistry
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.ApplicationStatus
@@ -35,6 +34,7 @@ import 
org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, 
CurrentJobStatus, JobNotFound}
 import org.apache.flink.runtime.messages.Messages.Acknowledge
+import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}

Reply via email to