Repository: flink Updated Branches: refs/heads/master e4fe89d6e -> e3fec1f9a
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java new file mode 100644 index 0000000..339e1d1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat; +import org.apache.flink.util.AbstractID; + +import org.apache.flink.util.SerializedValue; +import org.junit.Test; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; + +import static org.junit.Assert.*; + +public class TaskManagerGroupTest { + + // ------------------------------------------------------------------------ + // adding and removing jobs + // ------------------------------------------------------------------------ + + @Test + public void addAndRemoveJobs() throws IOException { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + final TaskManagerMetricGroup group = new TaskManagerMetricGroup( + registry, "localhost", new AbstractID().toString()); + + + final JobID jid1 = new JobID(); + final JobID jid2 = new JobID(); + + final String jobName1 = "testjob"; + final String jobName2 = "anotherJob"; + + final JobVertexID vertex11 = new JobVertexID(); + final JobVertexID vertex12 = new JobVertexID(); + final JobVertexID vertex13 = new JobVertexID(); + final JobVertexID vertex21 = new JobVertexID(); + + final ExecutionAttemptID execution11 = new ExecutionAttemptID(); + final ExecutionAttemptID execution12 = new ExecutionAttemptID(); + final ExecutionAttemptID execution13 = new ExecutionAttemptID(); + final ExecutionAttemptID execution21 = new ExecutionAttemptID(); + + TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( + jid1, + jobName1, + vertex11, + execution11, + new SerializedValue<>(new ExecutionConfig()), + "test", + 17, 18, 0, + new Configuration(), new Configuration(), + "", + new ArrayList<ResultPartitionDeploymentDescriptor>(), + new ArrayList<InputGateDeploymentDescriptor>(), + new ArrayList<BlobKey>(), + new ArrayList<URL>(), 0); + + TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( + jid1, + jobName1, + vertex12, + execution12, + new SerializedValue<>(new ExecutionConfig()), + "test", + 13, 18, 1, + new Configuration(), new Configuration(), + "", + new ArrayList<ResultPartitionDeploymentDescriptor>(), + new ArrayList<InputGateDeploymentDescriptor>(), + new ArrayList<BlobKey>(), + new ArrayList<URL>(), 0); + + TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor( + jid2, + jobName2, + vertex21, + execution21, + new SerializedValue<>(new ExecutionConfig()), + "test", + 7, 18, 2, + new Configuration(), new Configuration(), + "", + new ArrayList<ResultPartitionDeploymentDescriptor>(), + new ArrayList<InputGateDeploymentDescriptor>(), + new ArrayList<BlobKey>(), + new ArrayList<URL>(), 0); + + TaskDeploymentDescriptor tdd4 = new TaskDeploymentDescriptor( + jid1, + jobName1, + vertex13, + execution13, + new SerializedValue<>(new ExecutionConfig()), + "test", + 0, 18, 0, + new Configuration(), new Configuration(), + "", + new ArrayList<ResultPartitionDeploymentDescriptor>(), + new ArrayList<InputGateDeploymentDescriptor>(), + new ArrayList<BlobKey>(), + new ArrayList<URL>(), 0); + + TaskMetricGroup tmGroup11 = group.addTaskForJob(tdd1); + TaskMetricGroup tmGroup12 = group.addTaskForJob(tdd2); + TaskMetricGroup tmGroup21 = group.addTaskForJob(tdd3); + + assertEquals(2, group.numRegisteredJobMetricGroups()); + assertFalse(tmGroup11.parent().isClosed()); + assertFalse(tmGroup12.parent().isClosed()); + assertFalse(tmGroup21.parent().isClosed()); + + // close all for job 2 and one from job 1 + tmGroup11.close(); + tmGroup21.close(); + assertTrue(tmGroup11.isClosed()); + assertTrue(tmGroup21.isClosed()); + + // job 2 should be removed, job should still be there + assertFalse(tmGroup11.parent().isClosed()); + assertFalse(tmGroup12.parent().isClosed()); + assertTrue(tmGroup21.parent().isClosed()); + assertEquals(1, group.numRegisteredJobMetricGroups()); + + // add one more to job one + TaskMetricGroup tmGroup13 = group.addTaskForJob(tdd4); + tmGroup12.close(); + tmGroup13.close(); + + assertTrue(tmGroup11.parent().isClosed()); + assertTrue(tmGroup12.parent().isClosed()); + assertTrue(tmGroup13.parent().isClosed()); + + assertEquals(0, group.numRegisteredJobMetricGroups()); + + registry.shutdown(); + } + + @Test + public void testCloseClosesAll() throws IOException { + MetricRegistry registry = new MetricRegistry(new Configuration()); + final TaskManagerMetricGroup group = new TaskManagerMetricGroup( + registry, "localhost", new AbstractID().toString()); + + + final JobID jid1 = new JobID(); + final JobID jid2 = new JobID(); + + final String jobName1 = "testjob"; + final String jobName2 = "anotherJob"; + + final JobVertexID vertex11 = new JobVertexID(); + final JobVertexID vertex12 = new JobVertexID(); + final JobVertexID vertex21 = new JobVertexID(); + + final ExecutionAttemptID execution11 = new ExecutionAttemptID(); + final ExecutionAttemptID execution12 = new ExecutionAttemptID(); + final ExecutionAttemptID execution21 = new ExecutionAttemptID(); + + TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( + jid1, + jobName1, + vertex11, + execution11, + new SerializedValue<>(new ExecutionConfig()), + "test", + 17, 18, 0, + new Configuration(), new Configuration(), + "", + new ArrayList<ResultPartitionDeploymentDescriptor>(), + new ArrayList<InputGateDeploymentDescriptor>(), + new ArrayList<BlobKey>(), + new ArrayList<URL>(), 0); + + TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( + jid1, + jobName1, + vertex12, + execution12, + new SerializedValue<>(new ExecutionConfig()), + "test", + 13, 18, 1, + new Configuration(), new Configuration(), + "", + new ArrayList<ResultPartitionDeploymentDescriptor>(), + new ArrayList<InputGateDeploymentDescriptor>(), + new ArrayList<BlobKey>(), + new ArrayList<URL>(), 0); + + TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor( + jid2, + jobName2, + vertex21, + execution21, + new SerializedValue<>(new ExecutionConfig()), + "test", + 7, 18, 1, + new Configuration(), new Configuration(), + "", + new ArrayList<ResultPartitionDeploymentDescriptor>(), + new ArrayList<InputGateDeploymentDescriptor>(), + new ArrayList<BlobKey>(), + new ArrayList<URL>(), 0); + + TaskMetricGroup tmGroup11 = group.addTaskForJob(tdd1); + TaskMetricGroup tmGroup12 = group.addTaskForJob(tdd2); + TaskMetricGroup tmGroup21 = group.addTaskForJob(tdd3); + + group.close(); + + assertTrue(tmGroup11.isClosed()); + assertTrue(tmGroup12.isClosed()); + assertTrue(tmGroup21.isClosed()); + + registry.shutdown(); + } + + // ------------------------------------------------------------------------ + // scope name tests + // ------------------------------------------------------------------------ + + @Test + public void testGenerateScopeDefault() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "localhost", "id"); + + assertArrayEquals(new String[] { "localhost", "taskmanager", "id" }, group.getScopeComponents()); + assertEquals("localhost.taskmanager.id.name", group.getMetricIdentifier("name")); + registry.shutdown(); + } + + @Test + public void testGenerateScopeCustom() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + TaskManagerScopeFormat format = new TaskManagerScopeFormat("constant.<host>.foo.<host>"); + TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, format, "host", "id"); + + assertArrayEquals(new String[] { "constant", "host", "foo", "host" }, group.getScopeComponents()); + assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); + registry.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java new file mode 100644 index 0000000..36a3c54 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat; +import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat; + +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class TaskManagerJobGroupTest { + + @Test + public void testGenerateScopeDefault() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + + assertArrayEquals( + new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName"}, + jmGroup.getScopeComponents()); + + assertEquals( + "theHostName.taskmanager.test-tm-id.myJobName.name", + jmGroup.getMetricIdentifier("name")); + registry.shutdown(); + } + + @Test + public void testGenerateScopeCustom() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("abc"); + TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat); + + JobID jid = new JobID(); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); + + assertArrayEquals( + new String[] { "some-constant", "myJobName" }, + jmGroup.getScopeComponents()); + + assertEquals( + "some-constant.myJobName.name", + jmGroup.getMetricIdentifier("name")); + registry.shutdown(); + } + + @Test + public void testGenerateScopeCustomWildcard() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("peter.<tm_id>"); + TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat); + + JobID jid = new JobID(); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); + + assertArrayEquals( + new String[] { "peter", "test-tm-id", "some-constant", jid.toString() }, + jmGroup.getScopeComponents()); + + assertEquals( + "peter.test-tm-id.some-constant." + jid + ".name", + jmGroup.getMetricIdentifier("name")); + registry.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java new file mode 100644 index 0000000..d3ce0d1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.scope.ScopeFormat; +import org.apache.flink.runtime.metrics.scope.TaskManagerJobScopeFormat; +import org.apache.flink.runtime.metrics.scope.TaskManagerScopeFormat; +import org.apache.flink.runtime.metrics.scope.TaskScopeFormat; +import org.apache.flink.util.AbstractID; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TaskMetricGroupTest { + + // ------------------------------------------------------------------------ + // scope tests + // ------------------------------------------------------------------------ + private CountingMetricRegistry registry; + + @Before + public void createRegistry() { + this.registry = new CountingMetricRegistry(new Configuration()); + } + + @After + public void shutdownRegistry() { + this.registry.shutdown(); + this.registry = null; + } + + @Test + public void testGenerateScopeDefault() { + AbstractID vertexId = new AbstractID(); + AbstractID executionId = new AbstractID(); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + TaskMetricGroup taskGroup = new TaskMetricGroup(registry, jmGroup, vertexId, executionId, "aTaskName", 13, 2); + + assertArrayEquals( + new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", "aTaskName", "13"}, + taskGroup.getScopeComponents()); + + assertEquals( + "theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13.name", + taskGroup.getMetricIdentifier("name")); + registry.shutdown(); + } + + @Test + public void testGenerateScopeCustom() { + TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("abc"); + TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("def", tmFormat); + TaskScopeFormat taskFormat = new TaskScopeFormat("<tm_id>.<job_id>.<task_id>.<task_attempt_id>", jmFormat); + + JobID jid = new JobID(); + AbstractID vertexId = new AbstractID(); + AbstractID executionId = new AbstractID(); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName"); + TaskMetricGroup taskGroup = new TaskMetricGroup( + registry, jmGroup, taskFormat, vertexId, executionId, "aTaskName", 13, 2); + + assertArrayEquals( + new String[] { "test-tm-id", jid.toString(), vertexId.toString(), executionId.toString() }, + taskGroup.getScopeComponents()); + + assertEquals( + String.format("test-tm-id.%s.%s.%s.name", jid, vertexId, executionId), + taskGroup.getMetricIdentifier("name")); + registry.shutdown(); + } + + @Test + public void testGenerateScopeWilcard() { + TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat( + ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP); + TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat( + ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, tmFormat); + + TaskScopeFormat format = new TaskScopeFormat("*.<task_attempt_id>.<subtask_index>", jmFormat); + + AbstractID executionId = new AbstractID(); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + + TaskMetricGroup taskGroup = new TaskMetricGroup( + registry, jmGroup, format, new AbstractID(), executionId, "aTaskName", 13, 1); + + assertArrayEquals( + new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", executionId.toString(), "13" }, + taskGroup.getScopeComponents()); + + assertEquals( + "theHostName.taskmanager.test-tm-id.myJobName." + executionId + ".13.name", + taskGroup.getMetricIdentifier("name")); + registry.shutdown(); + } + + @Test + public void testTaskMetricGroupCleanup() { + TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(registry, "localhost", "0"); + TaskManagerJobMetricGroup taskManagerJobMetricGroup = new TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job"); + TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, taskManagerJobMetricGroup, new AbstractID(), new AbstractID(), "task", 0, 0); + + // the io metric should have registered predefined metrics + assertTrue(registry.getNumberRegisteredMetrics() > 0); + + taskMetricGroup.close(); + + // now alle registered metrics should have been unregistered + assertEquals(0, registry.getNumberRegisteredMetrics()); + } + + private static class CountingMetricRegistry extends MetricRegistry { + + private int counter = 0; + + CountingMetricRegistry(Configuration config) { + super(config); + } + + @Override + public void register(Metric metric, String metricName, MetricGroup group) { + super.register(metric, metricName, group); + counter++; + } + + @Override + public void unregister(Metric metric, String metricName, MetricGroup group) { + super.unregister(metric, metricName, group); + counter--; + } + + int getNumberRegisteredMetrics() { + return counter; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java new file mode 100644 index 0000000..cd96c60 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.util; + +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.AbstractReporter; + +public class TestReporter extends AbstractReporter { + + @Override + public void open(MetricConfig config) {} + + @Override + public void close() {} + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {} + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {} + + @Override + public String filterCharacters(String input) { + return input; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index 063e295..5af34fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.execution.Environment; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 78e4cce..9dea324 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.operators.testutils; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.execution.Environment; @@ -39,10 +39,10 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.types.Record; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java index 96922c6..ff0c480 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java @@ -22,12 +22,12 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.IOMetricGroup; -import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup; -import org.apache.flink.metrics.groups.TaskManagerMetricGroup; -import org.apache.flink.metrics.groups.TaskMetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 4b90b88..0c0d064 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java index 7660893..fea21be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java @@ -17,11 +17,10 @@ */ package org.apache.flink.runtime.taskmanager; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 5394fd1..fec9ef3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml index 6f6b96e..af60f3d 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml @@ -129,6 +129,13 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-jmx</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-base_2.10</artifactId> <version>${project.version}</version> <type>test-jar</type> http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml index fe4db7c..c355638 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml @@ -103,6 +103,13 @@ under the License. <version>${kafka.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-jmx</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-connectors/flink-connector-kafka-base/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml index 258a2cf..a36d222 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml @@ -126,6 +126,13 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-jmx</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>${project.version}</version> <type>test-jar</type> http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index b8815da..4450e94 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.jmx.JMXReporter; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.ForkableFlinkMiniCluster; @@ -101,7 +102,7 @@ public abstract class KafkaTestBase extends TestLogger { flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); - flinkConfig.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.metrics.reporter.JMXReporter"); + flinkConfig.setString(ConfigConstants.METRICS_REPORTER_CLASS, JMXReporter.class.getName()); flink = new ForkableFlinkMiniCluster(flinkConfig, false); flink.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 5578365..d11990e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -24,7 +24,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 1a66934..ce764b7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -21,7 +21,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.groups.IOMetricGroup; +import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index a8dd49b..7084208 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.event.AbstractEvent; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 76f22be..e9d583c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -23,7 +23,7 @@ import akka.actor.ActorRef; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index 5512ad7..7ca9c3e 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -24,13 +24,13 @@ import akka.actor.ActorRef import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore -import org.apache.flink.metrics.MetricRegistry import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.metrics.MetricRegistry import org.apache.flink.runtime.testingUtils.TestingJobManagerLike import scala.concurrent.duration.FiniteDuration http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index c772a25..94ad9f2 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -24,7 +24,6 @@ import akka.actor.ActorRef import org.apache.flink.api.common.JobID import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} -import org.apache.flink.metrics.MetricRegistry import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore import org.apache.flink.runtime.clusterframework.ApplicationStatus @@ -35,6 +34,7 @@ import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound} import org.apache.flink.runtime.messages.Messages.Acknowledge +import org.apache.flink.runtime.metrics.MetricRegistry import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
