Repository: flink Updated Branches: refs/heads/master 84d28ba00 -> c8f5745f2
[hotfix] [metrics] Refactor constructors and tests - removed constructors taking a specific ScopeFormat as argument, as they were only used by tests and serve no real purpose - refactored tests accordingly - made sure that the registry is shutdown in every test This closes #2302. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8f5745f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8f5745f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8f5745f Branch: refs/heads/master Commit: c8f5745f251cd8f927b2b9d93e8cb6105d3be226 Parents: 84d28ba Author: zentol <[email protected]> Authored: Wed Jul 27 13:21:47 2016 +0200 Committer: zentol <[email protected]> Committed: Wed Aug 17 13:58:59 2016 +0200 ---------------------------------------------------------------------- .../groups/JobManagerJobMetricGroup.java | 24 +++------- .../metrics/groups/JobManagerMetricGroup.java | 11 +---- .../metrics/groups/OperatorMetricGroup.java | 14 +----- .../groups/TaskManagerJobMetricGroup.java | 17 +------- .../metrics/groups/TaskManagerMetricGroup.java | 11 +---- .../runtime/metrics/groups/TaskMetricGroup.java | 24 ++-------- .../runtime/metrics/MetricRegistryTest.java | 12 ++++- .../metrics/groups/JobManagerGroupTest.java | 26 +++++++---- .../metrics/groups/JobManagerJobGroupTest.java | 31 +++++++------ .../groups/MetricGroupRegistrationTest.java | 1 + .../metrics/groups/TaskManagerGroupTest.java | 9 ++-- .../metrics/groups/TaskManagerJobGroupTest.java | 25 +++++------ .../metrics/groups/TaskMetricGroupTest.java | 46 +++++++------------- 13 files changed, 96 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java index 807ddd0..1396a5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.scope.JobManagerJobScopeFormat; import javax.annotation.Nullable; import java.util.Collections; @@ -36,24 +35,13 @@ public class JobManagerJobMetricGroup extends JobMetricGroup { private final JobManagerMetricGroup parent; public JobManagerJobMetricGroup( - MetricRegistry registry, - JobManagerMetricGroup parent, - JobID jobId, - @Nullable String jobName) { + MetricRegistry registry, + JobManagerMetricGroup parent, + JobID jobId, + @Nullable String jobName) { + super(registry, jobId, jobName, registry.getScopeFormats().getJobManagerJobFormat().formatScope(checkNotNull(parent), jobId, jobName)); - this(registry, checkNotNull(parent), registry.getScopeFormats().getJobManagerJobFormat(), jobId, jobName); - } - - public JobManagerJobMetricGroup( - MetricRegistry registry, - JobManagerMetricGroup parent, - JobManagerJobScopeFormat scopeFormat, - JobID jobId, - @Nullable String jobName) { - - super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName)); - - this.parent = checkNotNull(parent); + this.parent = parent; } public final JobManagerMetricGroup parent() { http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java index 4d3dfb7..0b02c20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat; import java.util.HashMap; import java.util.Map; @@ -38,15 +37,7 @@ public class JobManagerMetricGroup extends ComponentMetricGroup { private final String hostname; public JobManagerMetricGroup(MetricRegistry registry, String hostname) { - this(registry, registry.getScopeFormats().getJobManagerFormat(), hostname); - } - - public JobManagerMetricGroup( - MetricRegistry registry, - JobManagerScopeFormat scopeFormat, - String hostname) { - - super(registry, scopeFormat.formatScope(hostname)); + super(registry, registry.getScopeFormats().getJobManagerFormat().formatScope(hostname)); this.hostname = hostname; } http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java index caee821..bb01ac7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.scope.OperatorScopeFormat; import java.util.Collections; @@ -34,17 +33,8 @@ public class OperatorMetricGroup extends ComponentMetricGroup { private final TaskMetricGroup parent; public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup parent, String operatorName) { - this(registry, parent, registry.getScopeFormats().getOperatorFormat(), operatorName); - } - - public OperatorMetricGroup( - MetricRegistry registry, - TaskMetricGroup parent, - OperatorScopeFormat scopeFormat, - String operatorName) { - - super(registry, scopeFormat.formatScope(parent, operatorName)); - this.parent = checkNotNull(parent); + super(registry, registry.getScopeFormats().getOperatorFormat().formatScope(checkNotNull(parent), operatorName)); + this.parent = parent; } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java index 8219ef0..931c146 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat; import org.apache.flink.util.AbstractID; import javax.annotation.Nullable; @@ -50,20 +49,8 @@ public class TaskManagerJobMetricGroup extends JobMetricGroup { TaskManagerMetricGroup parent, JobID jobId, @Nullable String jobName) { - - this(registry, checkNotNull(parent), registry.getScopeFormats().getTaskManagerJobFormat(), jobId, jobName); - } - - public TaskManagerJobMetricGroup( - MetricRegistry registry, - TaskManagerMetricGroup parent, - TaskManagerJobScopeFormat scopeFormat, - JobID jobId, - @Nullable String jobName) { - - super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName)); - - this.parent = checkNotNull(parent); + super(registry, jobId, jobName, registry.getScopeFormats().getTaskManagerJobFormat().formatScope(checkNotNull(parent), jobId, jobName)); + this.parent = parent; } public final TaskManagerMetricGroup parent() { http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java index f58a95f..f4bd0c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat; import java.util.HashMap; import java.util.Map; @@ -42,15 +41,7 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup { public TaskManagerMetricGroup(MetricRegistry registry, String hostname, String taskManagerId) { - this(registry, registry.getScopeFormats().getTaskManagerFormat(), hostname, taskManagerId); - } - - public TaskManagerMetricGroup( - MetricRegistry registry, - TaskManagerScopeFormat scopeFormat, - String hostname, String taskManagerId) { - - super(registry, scopeFormat.formatScope(hostname, taskManagerId)); + super(registry, registry.getScopeFormats().getTaskManagerFormat().formatScope(hostname, taskManagerId)); this.hostname = hostname; this.taskManagerId = taskManagerId; } http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java index 8e23d2f..00e680b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.scope.TaskScopeFormat; import org.apache.flink.util.AbstractID; import javax.annotation.Nullable; @@ -65,26 +64,11 @@ public class TaskMetricGroup extends ComponentMetricGroup { @Nullable String taskName, int subtaskIndex, int attemptNumber) { - - this(registry, parent, registry.getScopeFormats().getTaskFormat(), - vertexId, executionId, taskName, subtaskIndex, attemptNumber); - } - - public TaskMetricGroup( - MetricRegistry registry, - TaskManagerJobMetricGroup parent, - TaskScopeFormat scopeFormat, - @Nullable AbstractID vertexId, - AbstractID executionId, - @Nullable String taskName, - int subtaskIndex, - int attemptNumber) { - - super(registry, scopeFormat.formatScope( - parent, vertexId, executionId, taskName, subtaskIndex, attemptNumber)); + super(registry, registry.getScopeFormats().getTaskFormat().formatScope( + checkNotNull(parent), vertexId, checkNotNull(executionId), taskName, subtaskIndex, attemptNumber)); - this.parent = checkNotNull(parent); - this.executionId = checkNotNull(executionId); + this.parent = parent; + this.executionId = executionId; this.vertexId = vertexId; this.taskName = taskName; this.subtaskIndex = subtaskIndex; http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java index d75ef57..f2bb8fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java @@ -53,6 +53,8 @@ public class MetricRegistryTest extends TestLogger { assertTrue(metricRegistry.getReporters().size() == 1); Assert.assertTrue(TestReporter1.wasOpened); + + metricRegistry.shutdown(); } protected static class TestReporter1 extends TestReporter { @@ -83,6 +85,8 @@ public class MetricRegistryTest extends TestLogger { Assert.assertTrue(TestReporter11.wasOpened); Assert.assertTrue(TestReporter12.wasOpened); Assert.assertTrue(TestReporter13.wasOpened); + + metricRegistry.shutdown(); } protected static class TestReporter11 extends TestReporter { @@ -124,7 +128,7 @@ public class MetricRegistryTest extends TestLogger { config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world"); - new MetricRegistry(config); + new MetricRegistry(config).shutdown(); } protected static class TestReporter2 extends TestReporter { @@ -149,7 +153,7 @@ public class MetricRegistryTest extends TestLogger { config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS"); - new MetricRegistry(config); + MetricRegistry registry = new MetricRegistry(config); long start = System.currentTimeMillis(); for (int x = 0; x < 10; x++) { @@ -168,6 +172,8 @@ public class MetricRegistryTest extends TestLogger { Assert.assertTrue("Too many report were triggered.", maxAllowedReports >= reportCount); } Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0); + + registry.shutdown(); } protected static class TestReporter3 extends TestReporter implements Scheduled { @@ -199,6 +205,8 @@ public class MetricRegistryTest extends TestLogger { assertTrue(TestReporter6.removeCalled); assertTrue(TestReporter7.addCalled); assertTrue(TestReporter7.removeCalled); + + registry.shutdown(); } protected static class TestReporter6 extends TestReporter { http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java index ddb5dfc..faf42ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java @@ -18,10 +18,10 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; @@ -36,8 +36,8 @@ public class JobManagerGroupTest { @Test public void addAndRemoveJobs() { - final JobManagerMetricGroup group = new JobManagerMetricGroup( - new MetricRegistry(new Configuration()), "localhost"); + MetricRegistry registry = new MetricRegistry(new Configuration()); + final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); final JobID jid1 = new JobID(); final JobID jid2 = new JobID(); @@ -62,12 +62,14 @@ public class JobManagerGroupTest { assertTrue(jmJobGroup21.isClosed()); assertEquals(0, group.numRegisteredJobMetricGroups()); + + registry.shutdown(); } @Test public void testCloseClosesAll() { - final JobManagerMetricGroup group = new JobManagerMetricGroup( - new MetricRegistry(new Configuration()), "localhost"); + MetricRegistry registry = new MetricRegistry(new Configuration()); + final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); final JobID jid1 = new JobID(); final JobID jid2 = new JobID(); @@ -82,6 +84,8 @@ public class JobManagerGroupTest { assertTrue(jmJobGroup11.isClosed()); assertTrue(jmJobGroup21.isClosed()); + + registry.shutdown(); } // ------------------------------------------------------------------------ @@ -95,15 +99,21 @@ public class JobManagerGroupTest { assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents()); assertEquals("localhost.jobmanager.name", group.getMetricIdentifier("name")); + + registry.shutdown(); } @Test public void testGenerateScopeCustom() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - JobManagerScopeFormat format = new JobManagerScopeFormat("constant.<host>.foo.<host>"); - JobManagerMetricGroup group = new JobManagerMetricGroup(registry, format, "host"); + Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "constant.<host>.foo.<host>"); + MetricRegistry registry = new MetricRegistry(cfg); + + JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "host"); assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents()); assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); + + registry.shutdown(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java index a13916b..45f37ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java @@ -19,10 +19,9 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.scope.JobManagerJobScopeFormat; -import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; @@ -44,19 +43,21 @@ public class JobManagerJobGroupTest { assertEquals( "theHostName.jobmanager.myJobName.name", jmGroup.getMetricIdentifier("name")); + + registry.shutdown(); } @Test public void testGenerateScopeCustom() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - JobManagerScopeFormat tmFormat = new JobManagerScopeFormat("abc"); - JobManagerJobScopeFormat jmFormat = new JobManagerJobScopeFormat("some-constant.<job_name>", tmFormat); + Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "abc"); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "some-constant.<job_name>"); + MetricRegistry registry = new MetricRegistry(cfg); JobID jid = new JobID(); JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); - JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); + JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jid, "myJobName"); assertArrayEquals( new String[] { "some-constant", "myJobName" }, @@ -65,19 +66,21 @@ public class JobManagerJobGroupTest { assertEquals( "some-constant.myJobName.name", jmGroup.getMetricIdentifier("name")); + + registry.shutdown(); } @Test public void testGenerateScopeCustomWildcard() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - JobManagerScopeFormat tmFormat = new JobManagerScopeFormat("peter"); - JobManagerJobScopeFormat jmFormat = new JobManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat); + Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "peter"); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>"); + MetricRegistry registry = new MetricRegistry(cfg); JobID jid = new JobID(); - JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, tmFormat, "theHostName"); - JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); + JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); + JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jid, "myJobName"); assertArrayEquals( new String[] { "peter", "some-constant", jid.toString() }, @@ -86,5 +89,7 @@ public class JobManagerJobGroupTest { assertEquals( "peter.some-constant." + jid + ".name", jmGroup.getMetricIdentifier("name")); + + registry.shutdown(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java index 9936631..a15b72e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java @@ -111,5 +111,6 @@ public class MetricGroupRegistrationTest { MetricGroup group3 = root.addGroup("group"); Assert.assertTrue(group1 == group2 && group2 == group3); + registry.shutdown(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java index 339e1d1..b80ff6a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -28,7 +29,6 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat; import org.apache.flink.util.AbstractID; import org.apache.flink.util.SerializedValue; @@ -260,9 +260,10 @@ public class TaskManagerGroupTest { @Test public void testGenerateScopeCustom() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - TaskManagerScopeFormat format = new TaskManagerScopeFormat("constant.<host>.foo.<host>"); - TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, format, "host", "id"); + Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "constant.<host>.foo.<host>"); + MetricRegistry registry = new MetricRegistry(cfg); + TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id"); assertArrayEquals(new String[] { "constant", "host", "foo", "host" }, group.getScopeComponents()); assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java index 36a3c54..c96af45 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java @@ -19,10 +19,9 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat; -import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat; import org.junit.Test; @@ -50,15 +49,15 @@ public class TaskManagerJobGroupTest { @Test public void testGenerateScopeCustom() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("abc"); - TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat); + Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc"); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "some-constant.<job_name>"); + MetricRegistry registry = new MetricRegistry(cfg); JobID jid = new JobID(); TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); - JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); + JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName"); assertArrayEquals( new String[] { "some-constant", "myJobName" }, @@ -72,15 +71,15 @@ public class TaskManagerJobGroupTest { @Test public void testGenerateScopeCustomWildcard() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("peter.<tm_id>"); - TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat); + Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "peter.<tm_id>"); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "*.some-constant.<job_id>"); + MetricRegistry registry = new MetricRegistry(cfg); JobID jid = new JobID(); - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id"); - JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName"); assertArrayEquals( new String[] { "peter", "test-tm-id", "some-constant", jid.toString() }, http://git-wip-us.apache.org/repos/asf/flink/blob/c8f5745f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java index d3ce0d1..da07f8f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java @@ -19,18 +19,13 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.scope.ScopeFormat; -import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat; -import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat; -import org.apache.flink.runtime.metrics.scope.TaskScopeFormat; import org.apache.flink.util.AbstractID; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; @@ -41,22 +36,11 @@ public class TaskMetricGroupTest { // ------------------------------------------------------------------------ // scope tests - // ------------------------------------------------------------------------ - private CountingMetricRegistry registry; - - @Before - public void createRegistry() { - this.registry = new CountingMetricRegistry(new Configuration()); - } - - @After - public void shutdownRegistry() { - this.registry.shutdown(); - this.registry = null; - } + // ----------------------------------------------------------------------- @Test public void testGenerateScopeDefault() { + MetricRegistry registry = new MetricRegistry(new Configuration()); AbstractID vertexId = new AbstractID(); AbstractID executionId = new AbstractID(); @@ -76,9 +60,11 @@ public class TaskMetricGroupTest { @Test public void testGenerateScopeCustom() { - TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("abc"); - TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("def", tmFormat); - TaskScopeFormat taskFormat = new TaskScopeFormat("<tm_id>.<job_id>.<task_id>.<task_attempt_id>", jmFormat); + Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc"); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "def"); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<tm_id>.<job_id>.<task_id>.<task_attempt_id>"); + MetricRegistry registry = new MetricRegistry(cfg); JobID jid = new JobID(); AbstractID vertexId = new AbstractID(); @@ -87,7 +73,7 @@ public class TaskMetricGroupTest { TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName"); TaskMetricGroup taskGroup = new TaskMetricGroup( - registry, jmGroup, taskFormat, vertexId, executionId, "aTaskName", 13, 2); + registry, jmGroup, vertexId, executionId, "aTaskName", 13, 2); assertArrayEquals( new String[] { "test-tm-id", jid.toString(), vertexId.toString(), executionId.toString() }, @@ -101,12 +87,9 @@ public class TaskMetricGroupTest { @Test public void testGenerateScopeWilcard() { - TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat( - ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP); - TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat( - ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, tmFormat); - - TaskScopeFormat format = new TaskScopeFormat("*.<task_attempt_id>.<subtask_index>", jmFormat); + Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "*.<task_attempt_id>.<subtask_index>"); + MetricRegistry registry = new MetricRegistry(cfg); AbstractID executionId = new AbstractID(); @@ -114,7 +97,7 @@ public class TaskMetricGroupTest { TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); TaskMetricGroup taskGroup = new TaskMetricGroup( - registry, jmGroup, format, new AbstractID(), executionId, "aTaskName", 13, 1); + registry, jmGroup, new AbstractID(), executionId, "aTaskName", 13, 1); assertArrayEquals( new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", executionId.toString(), "13" }, @@ -128,6 +111,7 @@ public class TaskMetricGroupTest { @Test public void testTaskMetricGroupCleanup() { + CountingMetricRegistry registry = new CountingMetricRegistry(new Configuration()); TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(registry, "localhost", "0"); TaskManagerJobMetricGroup taskManagerJobMetricGroup = new TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job"); TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, taskManagerJobMetricGroup, new AbstractID(), new AbstractID(), "task", 0, 0); @@ -139,6 +123,8 @@ public class TaskMetricGroupTest { // now alle registered metrics should have been unregistered assertEquals(0, registry.getNumberRegisteredMetrics()); + + registry.shutdown(); } private static class CountingMetricRegistry extends MetricRegistry {
