Repository: flink
Updated Branches:
  refs/heads/master 84d28ba00 -> c8f5745f2


[hotfix] [metrics] Refactor constructors and tests

- removed constructors taking a specific ScopeFormat as argument, as
they were only used by tests and serve no real purpose
- refactored tests accordingly
- made sure that the registry is shutdown in every test

This closes #2302.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8f5745f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8f5745f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8f5745f

Branch: refs/heads/master
Commit: c8f5745f251cd8f927b2b9d93e8cb6105d3be226
Parents: 84d28ba
Author: zentol <[email protected]>
Authored: Wed Jul 27 13:21:47 2016 +0200
Committer: zentol <[email protected]>
Committed: Wed Aug 17 13:58:59 2016 +0200

----------------------------------------------------------------------
 .../groups/JobManagerJobMetricGroup.java        | 24 +++-------
 .../metrics/groups/JobManagerMetricGroup.java   | 11 +----
 .../metrics/groups/OperatorMetricGroup.java     | 14 +-----
 .../groups/TaskManagerJobMetricGroup.java       | 17 +-------
 .../metrics/groups/TaskManagerMetricGroup.java  | 11 +----
 .../runtime/metrics/groups/TaskMetricGroup.java | 24 ++--------
 .../runtime/metrics/MetricRegistryTest.java     | 12 ++++-
 .../metrics/groups/JobManagerGroupTest.java     | 26 +++++++----
 .../metrics/groups/JobManagerJobGroupTest.java  | 31 +++++++------
 .../groups/MetricGroupRegistrationTest.java     |  1 +
 .../metrics/groups/TaskManagerGroupTest.java    |  9 ++--
 .../metrics/groups/TaskManagerJobGroupTest.java | 25 +++++------
 .../metrics/groups/TaskMetricGroupTest.java     | 46 +++++++-------------
 13 files changed, 96 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
index 807ddd0..1396a5a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
@@ -19,7 +19,6 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.scope.JobManagerJobScopeFormat;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
@@ -36,24 +35,13 @@ public class JobManagerJobMetricGroup extends 
JobMetricGroup {
        private final JobManagerMetricGroup parent;
 
        public JobManagerJobMetricGroup(
-               MetricRegistry registry,
-               JobManagerMetricGroup parent,
-               JobID jobId,
-               @Nullable String jobName) {
+                       MetricRegistry registry,
+                       JobManagerMetricGroup parent,
+                       JobID jobId,
+                       @Nullable String jobName) {
+               super(registry, jobId, jobName, 
registry.getScopeFormats().getJobManagerJobFormat().formatScope(checkNotNull(parent),
 jobId, jobName));
 
-               this(registry, checkNotNull(parent), 
registry.getScopeFormats().getJobManagerJobFormat(), jobId, jobName);
-       }
-
-       public JobManagerJobMetricGroup(
-               MetricRegistry registry,
-               JobManagerMetricGroup parent,
-               JobManagerJobScopeFormat scopeFormat,
-               JobID jobId,
-               @Nullable String jobName) {
-
-               super(registry, jobId, jobName, scopeFormat.formatScope(parent, 
jobId, jobName));
-
-               this.parent = checkNotNull(parent);
+               this.parent = parent;
        }
 
        public final JobManagerMetricGroup parent() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index 4d3dfb7..0b02c20 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -38,15 +37,7 @@ public class JobManagerMetricGroup extends 
ComponentMetricGroup {
        private final String hostname;
 
        public JobManagerMetricGroup(MetricRegistry registry, String hostname) {
-               this(registry, 
registry.getScopeFormats().getJobManagerFormat(), hostname);
-       }
-
-       public JobManagerMetricGroup(
-               MetricRegistry registry,
-               JobManagerScopeFormat scopeFormat,
-               String hostname) {
-
-               super(registry, scopeFormat.formatScope(hostname));
+               super(registry, 
registry.getScopeFormats().getJobManagerFormat().formatScope(hostname));
                this.hostname = hostname;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
index caee821..bb01ac7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.scope.OperatorScopeFormat;
 
 import java.util.Collections;
 
@@ -34,17 +33,8 @@ public class OperatorMetricGroup extends 
ComponentMetricGroup {
        private final TaskMetricGroup parent;
 
        public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup 
parent, String operatorName) {
-               this(registry, parent, 
registry.getScopeFormats().getOperatorFormat(), operatorName);
-       }
-
-       public OperatorMetricGroup(
-                       MetricRegistry registry,
-                       TaskMetricGroup parent,
-                       OperatorScopeFormat scopeFormat,
-                       String operatorName) {
-
-               super(registry, scopeFormat.formatScope(parent, operatorName));
-               this.parent = checkNotNull(parent);
+               super(registry, 
registry.getScopeFormats().getOperatorFormat().formatScope(checkNotNull(parent),
 operatorName));
+               this.parent = parent;
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
index 8219ef0..931c146 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat;
 import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
@@ -50,20 +49,8 @@ public class TaskManagerJobMetricGroup extends 
JobMetricGroup {
                        TaskManagerMetricGroup parent,
                        JobID jobId,
                        @Nullable String jobName) {
-
-               this(registry, checkNotNull(parent), 
registry.getScopeFormats().getTaskManagerJobFormat(), jobId, jobName);
-       }
-
-       public TaskManagerJobMetricGroup(
-                       MetricRegistry registry,
-                       TaskManagerMetricGroup parent,
-                       TaskManagerJobScopeFormat scopeFormat,
-                       JobID jobId,
-                       @Nullable String jobName) {
-
-               super(registry, jobId, jobName, scopeFormat.formatScope(parent, 
jobId, jobName));
-
-               this.parent = checkNotNull(parent);
+               super(registry, jobId, jobName, 
registry.getScopeFormats().getTaskManagerJobFormat().formatScope(checkNotNull(parent),
 jobId, jobName));
+               this.parent = parent;
        }
 
        public final TaskManagerMetricGroup parent() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
index f58a95f..f4bd0c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -42,15 +41,7 @@ public class TaskManagerMetricGroup extends 
ComponentMetricGroup {
 
 
        public TaskManagerMetricGroup(MetricRegistry registry, String hostname, 
String taskManagerId) {
-               this(registry, 
registry.getScopeFormats().getTaskManagerFormat(), hostname, taskManagerId);
-       }
-
-       public TaskManagerMetricGroup(
-                       MetricRegistry registry,
-                       TaskManagerScopeFormat scopeFormat,
-                       String hostname, String taskManagerId) {
-
-               super(registry, scopeFormat.formatScope(hostname, 
taskManagerId));
+               super(registry, 
registry.getScopeFormats().getTaskManagerFormat().formatScope(hostname, 
taskManagerId));
                this.hostname = hostname;
                this.taskManagerId = taskManagerId;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 8e23d2f..00e680b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.scope.TaskScopeFormat;
 import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
@@ -65,26 +64,11 @@ public class TaskMetricGroup extends ComponentMetricGroup {
                        @Nullable String taskName,
                        int subtaskIndex,
                        int attemptNumber) {
-               
-               this(registry, parent, 
registry.getScopeFormats().getTaskFormat(),
-                               vertexId, executionId, taskName, subtaskIndex, 
attemptNumber);
-       }
-
-       public TaskMetricGroup(
-                       MetricRegistry registry,
-                       TaskManagerJobMetricGroup parent,
-                       TaskScopeFormat scopeFormat, 
-                       @Nullable AbstractID vertexId,
-                       AbstractID executionId,
-                       @Nullable String taskName,
-                       int subtaskIndex,
-                       int attemptNumber) {
-
-               super(registry, scopeFormat.formatScope(
-                               parent, vertexId, executionId, taskName, 
subtaskIndex, attemptNumber));
+               super(registry, 
registry.getScopeFormats().getTaskFormat().formatScope(
+                       checkNotNull(parent), vertexId, 
checkNotNull(executionId), taskName, subtaskIndex, attemptNumber));
 
-               this.parent = checkNotNull(parent);
-               this.executionId = checkNotNull(executionId);
+               this.parent = parent;
+               this.executionId = executionId;
                this.vertexId = vertexId;
                this.taskName = taskName;
                this.subtaskIndex = subtaskIndex;

http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index d75ef57..f2bb8fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -53,6 +53,8 @@ public class MetricRegistryTest extends TestLogger {
                assertTrue(metricRegistry.getReporters().size() == 1);
 
                Assert.assertTrue(TestReporter1.wasOpened);
+
+               metricRegistry.shutdown();
        }
 
        protected static class TestReporter1 extends TestReporter {
@@ -83,6 +85,8 @@ public class MetricRegistryTest extends TestLogger {
                Assert.assertTrue(TestReporter11.wasOpened);
                Assert.assertTrue(TestReporter12.wasOpened);
                Assert.assertTrue(TestReporter13.wasOpened);
+
+               metricRegistry.shutdown();
        }
 
        protected static class TestReporter11 extends TestReporter {
@@ -124,7 +128,7 @@ public class MetricRegistryTest extends TestLogger {
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg1", "hello");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg2", "world");
 
-               new MetricRegistry(config);
+               new MetricRegistry(config).shutdown();
        }
 
        protected static class TestReporter2 extends TestReporter {
@@ -149,7 +153,7 @@ public class MetricRegistryTest extends TestLogger {
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg1", "hello");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
 
-               new MetricRegistry(config);
+               MetricRegistry registry = new MetricRegistry(config);
 
                long start = System.currentTimeMillis();
                for (int x = 0; x < 10; x++) {
@@ -168,6 +172,8 @@ public class MetricRegistryTest extends TestLogger {
                        Assert.assertTrue("Too many report were triggered.", 
maxAllowedReports >= reportCount);
                }
                Assert.assertTrue("No report was triggered.", 
TestReporter3.reportCount > 0);
+
+               registry.shutdown();
        }
 
        protected static class TestReporter3 extends TestReporter implements 
Scheduled {
@@ -199,6 +205,8 @@ public class MetricRegistryTest extends TestLogger {
                assertTrue(TestReporter6.removeCalled);
                assertTrue(TestReporter7.addCalled);
                assertTrue(TestReporter7.removeCalled);
+
+               registry.shutdown();
        }
 
        protected static class TestReporter6 extends TestReporter {

http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index ddb5dfc..faf42ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -18,10 +18,10 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -36,8 +36,8 @@ public class JobManagerGroupTest {
 
        @Test
        public void addAndRemoveJobs() {
-               final JobManagerMetricGroup group = new JobManagerMetricGroup(
-                       new MetricRegistry(new Configuration()), "localhost");
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               final JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
 
                final JobID jid1 = new JobID();
                final JobID jid2 = new JobID();
@@ -62,12 +62,14 @@ public class JobManagerGroupTest {
 
                assertTrue(jmJobGroup21.isClosed());
                assertEquals(0, group.numRegisteredJobMetricGroups());
+
+               registry.shutdown();
        }
 
        @Test
        public void testCloseClosesAll() {
-               final JobManagerMetricGroup group = new JobManagerMetricGroup(
-                       new MetricRegistry(new Configuration()), "localhost");
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               final JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
 
                final JobID jid1 = new JobID();
                final JobID jid2 = new JobID();
@@ -82,6 +84,8 @@ public class JobManagerGroupTest {
 
                assertTrue(jmJobGroup11.isClosed());
                assertTrue(jmJobGroup21.isClosed());
+
+               registry.shutdown();
        }
 
        // 
------------------------------------------------------------------------
@@ -95,15 +99,21 @@ public class JobManagerGroupTest {
 
                assertArrayEquals(new String[]{"localhost", "jobmanager"}, 
group.getScopeComponents());
                assertEquals("localhost.jobmanager.name", 
group.getMetricIdentifier("name"));
+
+               registry.shutdown();
        }
 
        @Test
        public void testGenerateScopeCustom() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               JobManagerScopeFormat format = new 
JobManagerScopeFormat("constant.<host>.foo.<host>");
-               JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, format, "host");
+               Configuration cfg = new Configuration();
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, 
"constant.<host>.foo.<host>");
+               MetricRegistry registry = new MetricRegistry(cfg);
+
+               JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "host");
 
                assertArrayEquals(new String[]{"constant", "host", "foo", 
"host"}, group.getScopeComponents());
                assertEquals("constant.host.foo.host.name", 
group.getMetricIdentifier("name"));
+
+               registry.shutdown();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index a13916b..45f37ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -19,10 +19,9 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.scope.JobManagerJobScopeFormat;
-import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -44,19 +43,21 @@ public class JobManagerJobGroupTest {
                assertEquals(
                                "theHostName.jobmanager.myJobName.name",
                                jmGroup.getMetricIdentifier("name"));
+
+               registry.shutdown();
        }
 
        @Test
        public void testGenerateScopeCustom() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               JobManagerScopeFormat tmFormat = new 
JobManagerScopeFormat("abc");
-               JobManagerJobScopeFormat jmFormat = new 
JobManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
+               Configuration cfg = new Configuration();
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "abc");
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, 
"some-constant.<job_name>");
+               MetricRegistry registry = new MetricRegistry(cfg);
 
                JobID jid = new JobID();
 
                JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName");
-               JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, jmFormat, jid, "myJobName");
+               JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, jid, "myJobName");
 
                assertArrayEquals(
                                new String[] { "some-constant", "myJobName" },
@@ -65,19 +66,21 @@ public class JobManagerJobGroupTest {
                assertEquals(
                                "some-constant.myJobName.name",
                                jmGroup.getMetricIdentifier("name"));
+
+               registry.shutdown();
        }
 
        @Test
        public void testGenerateScopeCustomWildcard() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               JobManagerScopeFormat tmFormat = new 
JobManagerScopeFormat("peter");
-               JobManagerJobScopeFormat jmFormat = new 
JobManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
+               Configuration cfg = new Configuration();
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "peter");
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, 
"*.some-constant.<job_id>");
+               MetricRegistry registry = new MetricRegistry(cfg);
 
                JobID jid = new JobID();
 
-               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, tmFormat, "theHostName");
-               JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, jmFormat, jid, "myJobName");
+               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName");
+               JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, jid, "myJobName");
 
                assertArrayEquals(
                                new String[] { "peter", "some-constant", 
jid.toString() },
@@ -86,5 +89,7 @@ public class JobManagerJobGroupTest {
                assertEquals(
                                "peter.some-constant." + jid + ".name",
                                jmGroup.getMetricIdentifier("name"));
+
+               registry.shutdown();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index 9936631..a15b72e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -111,5 +111,6 @@ public class MetricGroupRegistrationTest {
                MetricGroup group3 = root.addGroup("group");
                Assert.assertTrue(group1 == group2 && group2 == group3);
 
+               registry.shutdown();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 339e1d1..b80ff6a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -28,7 +29,6 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat;
 import org.apache.flink.util.AbstractID;
 
 import org.apache.flink.util.SerializedValue;
@@ -260,9 +260,10 @@ public class TaskManagerGroupTest {
 
        @Test
        public void testGenerateScopeCustom() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               TaskManagerScopeFormat format = new 
TaskManagerScopeFormat("constant.<host>.foo.<host>");
-               TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, format, "host", "id");
+               Configuration cfg = new Configuration();
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"constant.<host>.foo.<host>");
+               MetricRegistry registry = new MetricRegistry(cfg);
+               TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, "host", "id");
 
                assertArrayEquals(new String[] { "constant", "host", "foo", 
"host" }, group.getScopeComponents());
                assertEquals("constant.host.foo.host.name", 
group.getMetricIdentifier("name"));

http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index 36a3c54..c96af45 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -19,10 +19,9 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat;
-import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat;
 
 import org.junit.Test;
 
@@ -50,15 +49,15 @@ public class TaskManagerJobGroupTest {
 
        @Test
        public void testGenerateScopeCustom() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("abc");
-               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
+               Configuration cfg = new Configuration();
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc");
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
"some-constant.<job_name>");
+               MetricRegistry registry = new MetricRegistry(cfg);
 
                JobID jid = new JobID();
 
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-               JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
+               JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
 
                assertArrayEquals(
                                new String[] { "some-constant", "myJobName" },
@@ -72,15 +71,15 @@ public class TaskManagerJobGroupTest {
 
        @Test
        public void testGenerateScopeCustomWildcard() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("peter.<tm_id>");
-               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
+               Configuration cfg = new Configuration();
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"peter.<tm_id>");
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
"*.some-constant.<job_id>");
+               MetricRegistry registry = new MetricRegistry(cfg);
 
                JobID jid = new JobID();
 
-               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id");
-               JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
 
                assertArrayEquals(
                                new String[] { "peter", "test-tm-id", 
"some-constant", jid.toString() },

http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index d3ce0d1..da07f8f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -19,18 +19,13 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.scope.ScopeFormat;
-import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat;
-import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat;
-import org.apache.flink.runtime.metrics.scope.TaskScopeFormat;
 import org.apache.flink.util.AbstractID;
 
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -41,22 +36,11 @@ public class TaskMetricGroupTest {
 
        // 
------------------------------------------------------------------------
        //  scope tests
-       // 
------------------------------------------------------------------------
-       private CountingMetricRegistry registry;
-
-       @Before
-       public void createRegistry() {
-               this.registry = new CountingMetricRegistry(new Configuration());
-       }
-
-       @After
-       public void shutdownRegistry() {
-               this.registry.shutdown();
-               this.registry = null;
-       }
+       // 
-----------------------------------------------------------------------
 
        @Test
        public void testGenerateScopeDefault() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
                AbstractID vertexId = new AbstractID();
                AbstractID executionId = new AbstractID();
 
@@ -76,9 +60,11 @@ public class TaskMetricGroupTest {
 
        @Test
        public void testGenerateScopeCustom() {
-               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("abc");
-               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("def", tmFormat);
-               TaskScopeFormat taskFormat = new 
TaskScopeFormat("<tm_id>.<job_id>.<task_id>.<task_attempt_id>", jmFormat);
+               Configuration cfg = new Configuration();
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc");
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
"def");
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
"<tm_id>.<job_id>.<task_id>.<task_attempt_id>");
+               MetricRegistry registry = new MetricRegistry(cfg);
 
                JobID jid = new JobID();
                AbstractID vertexId = new AbstractID();
@@ -87,7 +73,7 @@ public class TaskMetricGroupTest {
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
                TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
                TaskMetricGroup taskGroup = new TaskMetricGroup(
-                               registry, jmGroup, taskFormat, vertexId, 
executionId, "aTaskName", 13, 2);
+                               registry, jmGroup, vertexId, executionId, 
"aTaskName", 13, 2);
 
                assertArrayEquals(
                                new String[] { "test-tm-id", jid.toString(), 
vertexId.toString(), executionId.toString() },
@@ -101,12 +87,9 @@ public class TaskMetricGroupTest {
 
        @Test
        public void testGenerateScopeWilcard() {
-               TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat(
-                               ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
-               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat(
-                               
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, tmFormat);
-
-               TaskScopeFormat format = new 
TaskScopeFormat("*.<task_attempt_id>.<subtask_index>", jmFormat);
+               Configuration cfg = new Configuration();
+               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
"*.<task_attempt_id>.<subtask_index>");
+               MetricRegistry registry = new MetricRegistry(cfg);
 
                AbstractID executionId = new AbstractID();
 
@@ -114,7 +97,7 @@ public class TaskMetricGroupTest {
                TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
 
                TaskMetricGroup taskGroup = new TaskMetricGroup(
-                               registry, jmGroup, format, new AbstractID(), 
executionId, "aTaskName", 13, 1);
+                               registry, jmGroup, new AbstractID(), 
executionId, "aTaskName", 13, 1);
 
                assertArrayEquals(
                                new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName", executionId.toString(), "13" },
@@ -128,6 +111,7 @@ public class TaskMetricGroupTest {
 
        @Test
        public void testTaskMetricGroupCleanup() {
+               CountingMetricRegistry registry = new 
CountingMetricRegistry(new Configuration());
                TaskManagerMetricGroup taskManagerMetricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "0");
                TaskManagerJobMetricGroup taskManagerJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job");
                TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, 
taskManagerJobMetricGroup, new AbstractID(), new AbstractID(), "task", 0, 0);
@@ -139,6 +123,8 @@ public class TaskMetricGroupTest {
 
                // now alle registered metrics should have been unregistered
                assertEquals(0, registry.getNumberRegisteredMetrics());
+
+               registry.shutdown();
        }
 
        private static class CountingMetricRegistry extends MetricRegistry {

Reply via email to