http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java index 858bc49..83c88cc 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java @@ -31,6 +31,7 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.util.DummyMetricGroup; import org.junit.Test; @@ -41,7 +42,7 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableNotFound() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(),new HashMap<String, Accumulator<?, ?>>()); + RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(),new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()); try { ctx.getBroadcastVariable("some name"); @@ -71,7 +72,7 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableSimple() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()); + RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()); ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4)); ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0)); @@ -105,7 +106,7 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableWithInitializer() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()); + RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); @@ -130,7 +131,7 @@ public class RuntimeUDFContextTest { @Test public void testResetBroadcastVariableWithInitializer() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()); + RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); @@ -153,7 +154,7 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableWithInitializerAndMismatch() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()); + RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java index 7ea0071..554820e 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.util.DummyMetricGroup; import org.apache.flink.types.Value; import org.junit.Assert; import org.junit.Test; @@ -40,7 +41,7 @@ public class RichInputFormatTest { public void testCheckRuntimeContextAccess() { final SerializedInputFormat<Value> inputFormat = new SerializedInputFormat<Value>(); final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0); - inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>())); + inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup())); Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1); Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3); http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java index 273f4f5..09db3a9 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.util.DummyMetricGroup; import org.apache.flink.types.Value; import org.junit.Assert; import org.junit.Test; @@ -41,7 +42,7 @@ public class RichOutputFormatTest { public void testCheckRuntimeContextAccess() { final SerializedOutputFormat<Value> inputFormat = new SerializedOutputFormat<Value>(); final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0); - inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>())); + inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup())); Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1); Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3); http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java index 5ca4c4c..7c905c1 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat; import org.apache.flink.api.common.operators.util.TestRichOutputFormat; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.util.DummyMetricGroup; import org.apache.flink.types.Nothing; import org.junit.Test; @@ -94,13 +95,13 @@ public class GenericDataSinkBaseTest implements java.io.Serializable { final TaskInfo taskInfo = new TaskInfo("test_sink", 0, 1, 0); executionConfig.disableObjectReuse(); in.reset(); - sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); assertEquals(out.output, asList(TestIOData.RICH_NAMES)); executionConfig.enableObjectReuse(); out.clear(); in.reset(); - sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); assertEquals(out.output, asList(TestIOData.RICH_NAMES)); } catch(Exception e){ e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java index 083039a..c360c62 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat; import org.apache.flink.api.common.operators.util.TestRichInputFormat; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.util.DummyMetricGroup; import org.junit.Test; import java.util.HashMap; @@ -83,7 +84,7 @@ public class GenericDataSourceBaseTest implements java.io.Serializable { executionConfig.disableObjectReuse(); assertEquals(false, in.hasBeenClosed()); assertEquals(false, in.hasBeenOpened()); - List<String> resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + List<String> resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); assertEquals(true, in.hasBeenClosed()); assertEquals(true, in.hasBeenOpened()); @@ -91,7 +92,7 @@ public class GenericDataSourceBaseTest implements java.io.Serializable { executionConfig.enableObjectReuse(); assertEquals(false, in.hasBeenClosed()); assertEquals(false, in.hasBeenOpened()); - List<String> resultRegular = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + List<String> resultRegular = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); assertEquals(true, in.hasBeenClosed()); assertEquals(true, in.hasBeenOpened()); http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java index cda3245..9447efd 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.util.DummyMetricGroup; import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; @@ -78,7 +79,7 @@ public class FlatMapOperatorCollectionTest implements Serializable { final TaskInfo taskInfo = new TaskInfo("Test UDF", 0, 4, 0); // run on collections final List<String> result = getTestFlatMapOperator(udf) - .executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig); + .executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); Assert.assertEquals(input.size(), result.size()); Assert.assertEquals(input, result); http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java index d119fe2..a610a4d 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.util.DummyMetricGroup; import org.apache.flink.util.Collector; import org.junit.Test; @@ -125,9 +126,9 @@ public class InnerJoinOperatorBaseTest implements Serializable { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); executionConfig.enableObjectReuse(); - List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); assertEquals(expected, resultSafe); assertEquals(expected, resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java index 6059ab1..7ecdefa 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java @@ -39,6 +39,7 @@ import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.util.DummyMetricGroup; import org.junit.Test; @SuppressWarnings("serial") @@ -112,9 +113,9 @@ public class MapOperatorTest implements java.io.Serializable { final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); executionConfig.enableObjectReuse(); - List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java index 71486a5..5012718 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java @@ -31,6 +31,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.util.DummyMetricGroup; import org.apache.flink.util.Collector; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.RichMapPartitionFunction; @@ -85,9 +86,9 @@ public class PartitionMapOperatorTest implements java.io.Serializable { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig); + List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); executionConfig.enableObjectReuse(); - List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig); + List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/MetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/MetricGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/MetricGroupTest.java new file mode 100644 index 0000000..f7502e5 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricGroupTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.metrics.util.TestReporter; +import org.junit.Assert; +import org.junit.Test; + +public class MetricGroupTest { + /** + * Verifies that group methods instantiate the correct metric with the given name. + */ + @Test + public void testMetricInstantiation() { + Configuration config = new Configuration(); + + config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter1.class.getName()); + + MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id"); + + root.counter("counter"); + Assert.assertTrue(TestReporter1.lastPassedMetric instanceof Counter); + Assert.assertEquals("counter", TestReporter1.lastPassedName); + + root.gauge("gauge", new Gauge<Object>() { + @Override + public Object getValue() { + return null; + } + }); + Assert.assertTrue(TestReporter1.lastPassedMetric instanceof Gauge); + Assert.assertEquals("gauge", TestReporter1.lastPassedName); + } + + protected static class TestReporter1 extends TestReporter { + public static Metric lastPassedMetric; + public static String lastPassedName; + + @Override + public void notifyOfAddedMetric(Metric metric, String name) { + lastPassedMetric = metric; + lastPassedName = name; + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String name) { + } + } + + /** + * Verifies that metric names containing special characters are rejected. + */ + @Test(expected = IllegalArgumentException.class) + public void testInvalidMetricName() { + Configuration config = new Configuration(); + + MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id"); + root.counter("=)(/!"); + } + + /** + * Verifies that when attempting to create a group with the name of an existing one the existing one will be returned instead. + */ + @Test + public void testDuplicateGroupName() { + Configuration config = new Configuration(); + + MetricGroup root = new TaskManagerMetricGroup(new MetricRegistry(config), "host", "id"); + + MetricGroup group1 = root.addGroup("group"); + MetricGroup group2 = root.addGroup("group"); + MetricGroup group3 = root.addGroup("group"); + Assert.assertTrue(group1 == group2 && group2 == group3); + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java new file mode 100644 index 0000000..32cc11c --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.JobMetricGroup; +import org.apache.flink.metrics.groups.OperatorMetricGroup; +import org.apache.flink.metrics.groups.Scope; +import org.apache.flink.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.metrics.util.TestReporter; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class MetricRegistryTest { + /** + * Verifies that the reporter class argument is correctly used to instantiate and open the reporter. + */ + @Test + public void testReporterInstantiation() { + Configuration config = new Configuration(); + + config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter1.class.getName()); + + new MetricRegistry(config); + + Assert.assertTrue(TestReporter1.wasOpened); + } + + protected static class TestReporter1 extends TestReporter { + public static boolean wasOpened = false; + + @Override + public void open(Configuration config) { + wasOpened = true; + } + } + + /** + * Verifies that configured arguments are properly forwarded to the reporter. + */ + @Test + public void testReporterArgumentForwarding() { + Configuration config = new Configuration(); + + config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter2.class.getName()); + config.setString(MetricRegistry.KEY_METRICS_REPORTER_ARGUMENTS, "--arg1 hello --arg2 world"); + + new MetricRegistry(config); + } + + protected static class TestReporter2 extends TestReporter { + @Override + public void open(Configuration config) { + Assert.assertEquals("hello", config.getString("arg1", null)); + Assert.assertEquals("world", config.getString("arg2", null)); + } + } + + /** + * Verifies that reporters implementing the Scheduled interface are regularly called to report the metrics. + * + * @throws InterruptedException + */ + @Test + public void testReporterScheduling() throws InterruptedException { + Configuration config = new Configuration(); + + config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter3.class.getName()); + config.setString(MetricRegistry.KEY_METRICS_REPORTER_INTERVAL, "50 MILLISECONDS"); + + new MetricRegistry(config); + + long start = System.currentTimeMillis(); + for (int x = 0; x < 10; x++) { + Thread.sleep(100); + int reportCount = TestReporter3.reportCount; + long curT = System.currentTimeMillis(); + /** + * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports. + * This value however does not not take the first triggered report into account (=> +1). + * Furthermore we have to account for the mis-alignment between reports being triggered and our time + * measurement (=> +1); for T=200 a total of 4-6 reports may have been + * triggered depending on whether the end of the interval for the first reports ends before + * or after T=50. + */ + long maxAllowedReports = (curT - start) / 50 + 2; + Assert.assertTrue("Too many report were triggered.", maxAllowedReports >= reportCount); + } + Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0); + } + + protected static class TestReporter3 extends TestReporter implements Scheduled { + public static int reportCount = 0; + + @Override + public void report() { + reportCount++; + } + } + + /** + * Verifies that groups are correctly created, nesting works, and names are properly forwarded to generate names. + */ + @Test + public void testMetricGroupGeneration() { + Configuration config = new Configuration(); + + config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter4.class.getName()); + + MetricRegistry registry = new MetricRegistry(config); + + MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); + root.counter("rootCounter"); + root.addGroup("top").counter("topCounter"); + } + + protected static class TestReporter4 extends TestReporter { + @Override + public String generateName(String name, List<String> scope) { + if (name.compareTo("rootCounter") == 0) { + Assert.assertEquals("host", scope.get(0)); + return "success"; + } else if (name.compareTo("topCounter") == 0) { + Assert.assertEquals("host", scope.get(0)); + Assert.assertEquals("taskmanager", scope.get(1)); + return "success"; + } else { + Assert.fail(); + return null; + } + } + } + + /** + * Verifies that reporters implementing the Listener interface are notified when Metrics are added or removed. + */ + @Test + public void testListener() { + Configuration config = new Configuration(); + + config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter6.class.getName()); + + MetricRegistry registry = new MetricRegistry(config); + + MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); + root.counter("rootCounter"); + root.close(); + + Assert.assertTrue(TestReporter6.addCalled); + Assert.assertTrue(TestReporter6.removeCalled); + } + + protected static class TestReporter6 extends TestReporter { + public static boolean addCalled = false; + public static boolean removeCalled = false; + + @Override + public void notifyOfAddedMetric(Metric metric, String name) { + addCalled = true; + Assert.assertTrue(metric instanceof Counter); + Assert.assertEquals("rootCounter", name); + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String name) { + removeCalled = true; + Assert.assertTrue(metric instanceof Counter); + Assert.assertEquals("rootCounter", name); + } + } + + /** + * Verifies that the scope configuration is properly extracted. + */ + @Test + public void testScopeConfig() { + Configuration config = new Configuration(); + + config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TM, "A"); + config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_JOB, "B"); + config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TASK, "C"); + config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_OPERATOR, "D"); + + Scope.ScopeFormat scopeConfig = new MetricRegistry(config).getScopeConfig(); + + Assert.assertEquals("A", scopeConfig.getTaskManagerFormat()); + Assert.assertEquals("B", scopeConfig.getJobFormat()); + Assert.assertEquals("C", scopeConfig.getTaskFormat()); + Assert.assertEquals("D", scopeConfig.getOperatorFormat()); + + Scope.ScopeFormat emptyScopeConfig = new MetricRegistry(new Configuration()).getScopeConfig(); + + Assert.assertEquals(TaskManagerMetricGroup.DEFAULT_SCOPE_TM, emptyScopeConfig.getTaskManagerFormat()); + Assert.assertEquals(JobMetricGroup.DEFAULT_SCOPE_JOB, emptyScopeConfig.getJobFormat()); + Assert.assertEquals(TaskMetricGroup.DEFAULT_SCOPE_TASK, emptyScopeConfig.getTaskFormat()); + Assert.assertEquals(OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR, emptyScopeConfig.getOperatorFormat()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java new file mode 100644 index 0000000..89483b3 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricRegistry; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class JobGroupTest { + @Test + public void testGenerateScopeDefault() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + JobMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") + .addJob(new JobID(), "job"); + + List<String> scope = operator.generateScope(); + assertEquals(4, scope.size()); + assertEquals("job", scope.get(3)); + } + + @Test + public void testGenerateScopeWildcard() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + JobMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") + .addJob(new JobID(), "job"); + + Scope.ScopeFormat format = new Scope.ScopeFormat(); + format.setJobFormat(Scope.concat(Scope.SCOPE_WILDCARD, "superjob", JobMetricGroup.SCOPE_JOB_NAME)); + + List<String> scope = operator.generateScope(format); + assertEquals(5, scope.size()); + assertEquals("superjob", scope.get(3)); + assertEquals("job", scope.get(4)); + } + + @Test + public void testGenerateScopeCustom() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + JobMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") + .addJob(new JobID(), "job"); + + Scope.ScopeFormat format = new Scope.ScopeFormat(); + format.setJobFormat(Scope.concat(TaskManagerMetricGroup.SCOPE_TM_HOST, "superjob", JobMetricGroup.SCOPE_JOB_NAME)); + + List<String> scope = operator.generateScope(format); + assertEquals(3, scope.size()); + assertEquals("host", scope.get(0)); + assertEquals("superjob", scope.get(1)); + assertEquals("job", scope.get(2)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java new file mode 100644 index 0000000..4f33d2a --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.util.AbstractID; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class OperatorGroupTest { + @Test + public void testGenerateScopeDefault() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + OperatorMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") + .addJob(new JobID(), "job") + .addTask(new AbstractID(), new AbstractID(), 0, "task").addOperator("operator"); + + List<String> scope = operator.generateScope(); + assertEquals(6, scope.size()); + assertEquals("host", scope.get(0)); + assertEquals("taskmanager", scope.get(1)); + assertEquals("id", scope.get(2)); + assertEquals("job", scope.get(3)); + assertEquals("operator", scope.get(4)); + assertEquals("0", scope.get(5)); + } + + @Test + public void testGenerateScopeWildcard() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + OperatorMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") + .addJob(new JobID(), "job") + .addTask(new AbstractID(), new AbstractID(), 0, "task").addOperator("operator"); + + Scope.ScopeFormat format = new Scope.ScopeFormat(); + format.setOperatorFormat(Scope.concat(Scope.SCOPE_WILDCARD, "op", OperatorMetricGroup.SCOPE_OPERATOR_NAME)); + + List<String> scope = operator.generateScope(format); + assertEquals(7, scope.size()); + assertEquals("host", scope.get(0)); + assertEquals("taskmanager", scope.get(1)); + assertEquals("id", scope.get(2)); + assertEquals("job", scope.get(3)); + assertEquals("task", scope.get(4)); + assertEquals("op", scope.get(5)); + assertEquals("operator", scope.get(6)); + } + + @Test + public void testGenerateScopeCustom() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + OperatorMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") + .addJob(new JobID(), "job") + .addTask(new AbstractID(), new AbstractID(), 0, "task").addOperator("operator"); + + Scope.ScopeFormat format = new Scope.ScopeFormat(); + format.setOperatorFormat(Scope.concat("jobs", JobMetricGroup.SCOPE_JOB_NAME, "op", OperatorMetricGroup.SCOPE_OPERATOR_NAME)); + + List<String> scope = operator.generateScope(format); + assertEquals(4, scope.size()); + assertEquals("jobs", scope.get(0)); + assertEquals("job", scope.get(1)); + assertEquals("op", scope.get(2)); + assertEquals("operator", scope.get(3)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java new file mode 100644 index 0000000..c49fdcd --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.util.AbstractID; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class TaskGroupTest { + @Test + public void testGenerateScopeDefault() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + TaskMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") + .addJob(new JobID(), "job") + .addTask(new AbstractID(), new AbstractID(), 0, "task"); + + List<String> scope = operator.generateScope(); + assertEquals(5, scope.size()); + assertEquals("task", scope.get(4)); + } + + @Test + public void testGenerateScopeWilcard() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + TaskMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") + .addJob(new JobID(), "job") + .addTask(new AbstractID(), new AbstractID(), 0, "task"); + + Scope.ScopeFormat format = new Scope.ScopeFormat(); + format.setTaskFormat(Scope.concat(Scope.SCOPE_WILDCARD, "supertask", TaskMetricGroup.SCOPE_TASK_NAME)); + + List<String> scope = operator.generateScope(format); + assertEquals(6, scope.size()); + assertEquals("host", scope.get(0)); + assertEquals("taskmanager", scope.get(1)); + assertEquals("id", scope.get(2)); + assertEquals("job", scope.get(3)); + assertEquals("supertask", scope.get(4)); + assertEquals("task", scope.get(5)); + } + + @Test + public void testGenerateScopeCustom() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + TaskMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") + .addJob(new JobID(), "job") + .addTask(new AbstractID(), new AbstractID(), 0, "task"); + + Scope.ScopeFormat format = new Scope.ScopeFormat(); + format.setTaskFormat(Scope.concat(TaskManagerMetricGroup.SCOPE_TM_HOST, JobMetricGroup.SCOPE_JOB_NAME, "supertask", TaskMetricGroup.SCOPE_TASK_NAME)); + + List<String> scope = operator.generateScope(format); + assertEquals(4, scope.size()); + assertEquals("host", scope.get(0)); + assertEquals("job", scope.get(1)); + assertEquals("supertask", scope.get(2)); + assertEquals("task", scope.get(3)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java new file mode 100644 index 0000000..7b3286d --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricRegistry; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class TaskManagerGroupTest { + @Test + public void testGenerateScopeDefault() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + TaskManagerMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id"); + + List<String> scope = operator.generateScope(); + assertEquals(3, scope.size()); + assertEquals("host", scope.get(0)); + assertEquals("taskmanager", scope.get(1)); + assertEquals("id", scope.get(2)); + } + + @Test + public void testGenerateScopeWildcard() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + TaskManagerMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id"); + + Scope.ScopeFormat format = new Scope.ScopeFormat(); + format.setTaskManagerFormat(Scope.concat(Scope.SCOPE_WILDCARD, "superhost", TaskManagerMetricGroup.SCOPE_TM_HOST)); + + List<String> scope = operator.generateScope(format); + assertEquals(2, scope.size()); + assertEquals("superhost", scope.get(0)); + assertEquals("host", scope.get(1)); + } + + @Test + public void testGenerateScopeCustom() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + TaskManagerMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id"); + + Scope.ScopeFormat format = new Scope.ScopeFormat(); + format.setTaskManagerFormat(Scope.concat("h", TaskManagerMetricGroup.SCOPE_TM_HOST, "t", TaskManagerMetricGroup.SCOPE_TM_ID)); + + List<String> scope = operator.generateScope(format); + assertEquals(4, scope.size()); + assertEquals("h", scope.get(0)); + assertEquals("host", scope.get(1)); + assertEquals("t", scope.get(2)); + assertEquals("id", scope.get(3)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java new file mode 100644 index 0000000..0d683c2 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.reporter; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class JMXReporterTest { + /** + * Verifies that the JMXReporter properly generates the JMX name. + */ + @Test + public void testGenerateName() { + String name = "metric"; + + List<String> scope = new ArrayList<>(); + scope.add("value0"); + scope.add("value1"); + scope.add("\"value2 (test),=;:?'"); + + String jmxName = new JMXReporter().generateName(name, scope); + + Assert.assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=metric", jmxName); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java new file mode 100644 index 0000000..d607072 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.util; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.JobMetricGroup; +import org.apache.flink.util.AbstractID; + +public class DummyJobMetricGroup extends JobMetricGroup { + public DummyJobMetricGroup() { + super(new DummyMetricRegistry(), new DummyTaskManagerMetricGroup(), new JobID(), "job"); + } + + @Override + public DummyTaskMetricGroup addTask(AbstractID id, AbstractID attemptID, int subtaskIndex, String name) { + return new DummyTaskMetricGroup(); + } + + @Override + protected MetricGroup addMetric(String name, Metric metric) { + return this; + } + + @Override + public MetricGroup addGroup(int name) { + return addGroup("" + name); + } + + @Override + public MetricGroup addGroup(String name) { + return new DummyMetricGroup(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java new file mode 100644 index 0000000..26df874 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.util; + +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.AbstractMetricGroup; +import org.apache.flink.metrics.groups.Scope; + +import java.util.ArrayList; +import java.util.List; + +public class DummyMetricGroup extends AbstractMetricGroup { + public DummyMetricGroup() { + super(new DummyMetricRegistry()); + } + + @Override + public List<String> generateScope() { + return new ArrayList<>(); + } + + @Override + public List<String> generateScope(Scope.ScopeFormat format) { + return new ArrayList<>(); + } + + @Override + protected MetricGroup addMetric(String name, Metric metric) { + return this; + } + + @Override + public MetricGroup addGroup(int name) { + return addGroup("" + name); + } + + @Override + public MetricGroup addGroup(String name) { + return new DummyMetricGroup(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java new file mode 100644 index 0000000..f0d6d3f --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricRegistry; + +public class DummyMetricRegistry extends MetricRegistry { + private static final Configuration config; + + static { + config = new Configuration(); + config.setString(KEY_METRICS_REPORTER_CLASS, DummyReporter.class.getCanonicalName()); + } + + public DummyMetricRegistry() { + super(new Configuration()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java new file mode 100644 index 0000000..eb45f6a --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.util; + +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.OperatorMetricGroup; + +public class DummyOperatorMetricGroup extends OperatorMetricGroup { + public DummyOperatorMetricGroup() { + super(new DummyMetricRegistry(), new DummyTaskMetricGroup(), "operator", 0); + } + + @Override + protected MetricGroup addMetric(String name, Metric metric) { + return this; + } + + @Override + public MetricGroup addGroup(int name) { + return addGroup("" + name); + } + + @Override + public MetricGroup addGroup(String name) { + return new DummyMetricGroup(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java new file mode 100644 index 0000000..23a7768 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.reporter.MetricReporter; + +import java.util.List; + +public class DummyReporter implements MetricReporter { + @Override + public void open(Configuration config) { + } + + @Override + public void close() { + } + + @Override + public void notifyOfAddedMetric(Metric metric, String name) { + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String name) { + } + + @Override + public String generateName(String name, List<String> scope) { + return ""; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java new file mode 100644 index 0000000..1c7d33b --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.util; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.TaskManagerMetricGroup; + +public class DummyTaskManagerMetricGroup extends TaskManagerMetricGroup { + public DummyTaskManagerMetricGroup() { + super(new DummyMetricRegistry(), "host", "id"); + } + + public DummyJobMetricGroup addJob(JobID id, String name) { + return new DummyJobMetricGroup(); + } + + @Override + protected MetricGroup addMetric(String name, Metric metric) { + return this; + } + + @Override + public MetricGroup addGroup(int name) { + return addGroup("" + name); + } + + @Override + public MetricGroup addGroup(String name) { + return new DummyMetricGroup(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java new file mode 100644 index 0000000..53683f4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.util; + +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.util.AbstractID; + +public class DummyTaskMetricGroup extends TaskMetricGroup { + public DummyTaskMetricGroup() { + super(new DummyMetricRegistry(), new DummyJobMetricGroup(), new AbstractID(), new AbstractID(), 0, "task"); + } + + public DummyOperatorMetricGroup addOperator(String name) { + return new DummyOperatorMetricGroup(); + } + + @Override + protected MetricGroup addMetric(String name, Metric metric) { + return this; + } + + @Override + public MetricGroup addGroup(int name) { + return addGroup("" + name); + } + + @Override + public MetricGroup addGroup(String name) { + return new DummyMetricGroup(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java new file mode 100644 index 0000000..482d1e8 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.metrics.reporter.MetricReporter; + +import java.util.List; + +public class TestReporter extends AbstractReporter { + @Override + public void open(Configuration config) { + } + + @Override + public void close() { + } + + @Override + public String generateName(String name, List<String> scope) { + return name; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-dist/src/main/flink-bin/bin/config.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index ffbec07..b6bdbed 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -103,6 +103,8 @@ KEY_ENV_SSH_OPTS="env.ssh.opts" KEY_RECOVERY_MODE="recovery.mode" KEY_ZK_HEAP_MB="zookeeper.heap.mb" +KEY_METRICS_JMX_PORT="metrics.jmx.port" + ######################################################################################################################## # PATHS AND CONFIG ######################################################################################################################## @@ -240,6 +242,10 @@ if [ -z "${RECOVERY_MODE}" ]; then RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}") fi +if [ -z "${JMX_PORT}" ]; then + JMX_PORT=$(readFromConfig ${KEY_METRICS_JMX_PORT} 9010 "${YAML_CONF}") +fi + # Arguments for the JVM. Used for job and task manager JVMs. # DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys # KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that! http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-dist/src/main/flink-bin/bin/flink-daemon.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh index 1ef7439..cc7163f 100644 --- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh @@ -23,14 +23,24 @@ USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (jobmanager|taskmanager|zook STARTSTOP=$1 DAEMON=$2 ARGS=("${@:3}") # get remaining arguments as array +JMX_ARGS="" + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh case $DAEMON in (jobmanager) CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager + if [ "${ARGS[3]}" == "local" ]; then + JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${JMX_PORT} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" + fi ;; (taskmanager) CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager + JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${JMX_PORT} -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" ;; (zookeeper) @@ -43,11 +53,6 @@ case $DAEMON in ;; esac -bin=`dirname "$0"` -bin=`cd "$bin"; pwd` - -. "$bin"/config.sh - if [ "$FLINK_IDENT_STRING" = "" ]; then FLINK_IDENT_STRING="$USER" fi @@ -96,12 +101,13 @@ case $STARTSTOP in count="${#active[@]}" if [ ${count} -gt 0 ]; then + JMX_ARGS="" echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." fi fi echo "Starting $DAEMON daemon on host $HOSTNAME." - $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & + $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} ${JMX_ARGS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & mypid=$! http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java index 8ce83f3..31b3ba2 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java @@ -31,6 +31,7 @@ import org.apache.flink.api.java.tuple.builder.Tuple2Builder; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.util.DummyMetricGroup; import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; @@ -76,7 +77,7 @@ public class CoGroupOperatorCollectionTest implements Serializable { final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>(); final HashMap<String, Future<Path>> cpTasks = new HashMap<>(); final TaskInfo taskInfo = new TaskInfo("Test UDF", 0, 4, 0); - final RuntimeContext ctx = new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulators); + final RuntimeContext ctx = new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulators, new DummyMetricGroup()); { SumCoGroup udf1 = new SumCoGroup(); http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java index a563281..1b627c4 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.util.DummyMetricGroup; import org.apache.flink.util.Collector; import org.junit.Test; @@ -168,9 +169,9 @@ public class GroupReduceOperatorTest implements java.io.Serializable { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig); + List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); executionConfig.enableObjectReuse(); - List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig); + List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe); http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java index a6b3deb..1d5668b 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.util.DummyMetricGroup; import org.apache.flink.util.Collector; import org.junit.Test; @@ -107,9 +108,9 @@ public class InnerJoinOperatorBaseTest implements Serializable { final TaskInfo taskInfo = new TaskInfo("op", 0, 1, 0); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig); + List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); executionConfig.enableObjectReuse(); - List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig); + List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); assertEquals(expected, new HashSet<>(resultSafe)); assertEquals(expected, new HashSet<>(resultRegular)); http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java index c04916d..4317c03 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.util.DummyMetricGroup; import org.junit.Test; import java.util.ArrayList; @@ -145,9 +146,9 @@ public class ReduceOperatorTest implements java.io.Serializable { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig); + List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); executionConfig.enableObjectReuse(); - List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig); + List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe); Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-metrics/flink-metrics-dropwizard/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/pom.xml b/flink-metrics/flink-metrics-dropwizard/pom.xml new file mode 100644 index 0000000..84d9722 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/pom.xml @@ -0,0 +1,72 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-metrics-dropwizard</artifactId> + <name>flink-metrics-dropwizard</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${metrics.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.4</version> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java new file mode 100644 index 0000000..a7309be --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.dropwizard; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.dropwizard.metrics.CounterWrapper; +import org.apache.flink.dropwizard.metrics.GaugeWrapper; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; + +import java.util.List; + +/** + * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} that wraps a + * Dropwizard {@link com.codahale.metrics.Reporter}. + */ +@PublicEvolving +public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled { + protected MetricRegistry registry; + protected ScheduledReporter reporter; + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + public static final String ARG_PREFIX = "prefix"; + public static final String ARG_CONVERSION_RATE = "rateConversion"; + public static final String ARG_CONVERSION_DURATION = "durationConversion"; + + protected ScheduledDropwizardReporter() { + this.registry = new MetricRegistry(); + } + + @Override + public synchronized void notifyOfAddedMetric(Metric metric, String name) { + if (metric instanceof Counter) { + registry.register(name, new CounterWrapper((Counter) metric)); + } else if (metric instanceof Gauge) { + registry.register(name, new GaugeWrapper((Gauge) metric)); + } + } + + @Override + public synchronized void notifyOfRemovedMetric(Metric metric, String name) { + registry.remove(name); + } + + public abstract ScheduledReporter getReporter(Configuration config); + + @Override + public void open(Configuration config) { + this.reporter = getReporter(config); + } + + @Override + public void close() { + this.reporter.stop(); + } + + @Override + public String generateName(String name, List<String> scope) { + StringBuilder sb = new StringBuilder(); + for (String s : scope) { + sb.append(s); + sb.append('.'); + } + sb.append(name); + return sb.toString(); + } + + @Override + public synchronized void report() { + this.reporter.report( + this.registry.getGauges(), + this.registry.getCounters(), + this.registry.getHistograms(), + this.registry.getMeters(), + this.registry.getTimers()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java new file mode 100644 index 0000000..f6630b9 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.dropwizard.metrics; + +import org.apache.flink.metrics.Counter; + +public class CounterWrapper extends com.codahale.metrics.Counter { + private final Counter counter; + + public CounterWrapper(Counter counter) { + this.counter = counter; + } + + @Override + public long getCount() { + return this.counter.getCount(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java new file mode 100644 index 0000000..d47090d --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.dropwizard.metrics; + +import org.apache.flink.metrics.Gauge; + +public class GaugeWrapper implements com.codahale.metrics.Gauge { + private final Gauge gauge; + + public GaugeWrapper(Gauge gauge) { + this.gauge = gauge; + } + + @Override + public Object getValue() { + return this.gauge.getValue(); + } +}
