http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskMetricGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskMetricGroupTest.java deleted file mode 100644 index f62f51f..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskMetricGroupTest.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.scope.ScopeFormat; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat; -import org.apache.flink.util.AbstractID; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TaskMetricGroupTest { - - // ------------------------------------------------------------------------ - // scope tests - // ------------------------------------------------------------------------ - private CountingMetricRegistry registry; - - @Before - public void createRegistry() { - this.registry = new CountingMetricRegistry(new Configuration()); - } - - @After - public void shutdownRegistry() { - this.registry.shutdown(); - this.registry = null; - } - - @Test - public void testGenerateScopeDefault() { - AbstractID vertexId = new AbstractID(); - AbstractID executionId = new AbstractID(); - - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); - TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); - TaskMetricGroup taskGroup = new TaskMetricGroup(registry, jmGroup, vertexId, executionId, "aTaskName", 13, 2); - - assertArrayEquals( - new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", "aTaskName", "13"}, - taskGroup.getScopeComponents()); - - assertEquals( - "theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13.name", - taskGroup.getMetricIdentifier("name")); - registry.shutdown(); - } - - @Test - public void testGenerateScopeCustom() { - TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("abc"); - TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("def", tmFormat); - TaskScopeFormat taskFormat = new TaskScopeFormat("<tm_id>.<job_id>.<task_id>.<task_attempt_id>", jmFormat); - - JobID jid = new JobID(); - AbstractID vertexId = new AbstractID(); - AbstractID executionId = new AbstractID(); - - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); - TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName"); - TaskMetricGroup taskGroup = new TaskMetricGroup( - registry, jmGroup, taskFormat, vertexId, executionId, "aTaskName", 13, 2); - - assertArrayEquals( - new String[] { "test-tm-id", jid.toString(), vertexId.toString(), executionId.toString() }, - taskGroup.getScopeComponents()); - - assertEquals( - String.format("test-tm-id.%s.%s.%s.name", jid, vertexId, executionId), - taskGroup.getMetricIdentifier("name")); - registry.shutdown(); - } - - @Test - public void testGenerateScopeWilcard() { - TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat( - ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP); - TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat( - ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, tmFormat); - - TaskScopeFormat format = new TaskScopeFormat("*.<task_attempt_id>.<subtask_index>", jmFormat); - - AbstractID executionId = new AbstractID(); - - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); - TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); - - TaskMetricGroup taskGroup = new TaskMetricGroup( - registry, jmGroup, format, new AbstractID(), executionId, "aTaskName", 13, 1); - - assertArrayEquals( - new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", executionId.toString(), "13" }, - taskGroup.getScopeComponents()); - - assertEquals( - "theHostName.taskmanager.test-tm-id.myJobName." + executionId + ".13.name", - taskGroup.getMetricIdentifier("name")); - registry.shutdown(); - } - - @Test - public void testTaskMetricGroupCleanup() { - TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(registry, "localhost", "0"); - TaskManagerJobMetricGroup taskManagerJobMetricGroup = new TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job"); - TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, taskManagerJobMetricGroup, new AbstractID(), new AbstractID(), "task", 0, 0); - - // the io metric should have registered predefined metrics - assertTrue(registry.getNumberRegisteredMetrics() > 0); - - taskMetricGroup.close(); - - // now alle registered metrics should have been unregistered - assertEquals(0, registry.getNumberRegisteredMetrics()); - } - - private static class CountingMetricRegistry extends MetricRegistry { - - private int counter = 0; - - CountingMetricRegistry(Configuration config) { - super(config); - } - - @Override - public void register(Metric metric, String metricName, AbstractMetricGroup group) { - super.register(metric, metricName, group); - counter++; - } - - @Override - public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { - super.unregister(metric, metricName, group); - counter--; - } - - int getNumberRegisteredMetrics() { - return counter; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java deleted file mode 100644 index 06bcdf3..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.reporter; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.HistogramStatistics; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.TaskManagerMetricGroup; -import org.apache.flink.metrics.util.TestReporter; -import org.apache.flink.util.TestLogger; -import org.junit.Test; - -import javax.management.MBeanAttributeInfo; -import javax.management.MBeanInfo; -import javax.management.MBeanServer; -import javax.management.MBeanServerConnection; -import javax.management.ObjectName; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXConnectorFactory; -import javax.management.remote.JMXServiceURL; -import java.lang.management.ManagementFactory; - -import static org.junit.Assert.assertEquals; - -public class JMXReporterTest extends TestLogger { - - @Test - public void testReplaceInvalidChars() { - assertEquals("", JMXReporter.replaceInvalidChars("")); - assertEquals("abc", JMXReporter.replaceInvalidChars("abc")); - assertEquals("abc", JMXReporter.replaceInvalidChars("abc\"")); - assertEquals("abc", JMXReporter.replaceInvalidChars("\"abc")); - assertEquals("abc", JMXReporter.replaceInvalidChars("\"abc\"")); - assertEquals("abc", JMXReporter.replaceInvalidChars("\"a\"b\"c\"")); - assertEquals("", JMXReporter.replaceInvalidChars("\"\"\"\"")); - assertEquals("____", JMXReporter.replaceInvalidChars(" ")); - assertEquals("ab_-(c)-", JMXReporter.replaceInvalidChars("\"ab ;(c)'")); - assertEquals("a_b_c", JMXReporter.replaceInvalidChars("a b c")); - assertEquals("a_b_c_", JMXReporter.replaceInvalidChars("a b c ")); - assertEquals("a-b-c-", JMXReporter.replaceInvalidChars("a;b'c*")); - assertEquals("a------b------c", JMXReporter.replaceInvalidChars("a,=;:?'b,=;:?'c")); - } - - /** - * Verifies that the JMXReporter properly generates the JMX name. - */ - @Test - public void testGenerateName() { - String[] scope = {"value0", "value1", "\"value2 (test),=;:?'"}; - String jmxName = JMXReporter.generateJmxName("TestMetric", scope); - - assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=TestMetric", jmxName); - } - - /** - * Verifies that multiple JMXReporters can be started on the same machine and register metrics at the MBeanServer. - * - * @throws Exception if the attribute/mbean could not be found or the test is broken - */ - @Test - public void testPortConflictHandling() throws Exception { - Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter.class.getName()); - MetricRegistry reg = new MetricRegistry(cfg); - - TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm"); - - JMXReporter rep1 = new JMXReporter(); - JMXReporter rep2 = new JMXReporter(); - - Configuration cfg1 = new Configuration(); - cfg1.setString("port", "9020-9035"); - - rep1.open(cfg1); - rep2.open(cfg1); - - rep1.notifyOfAddedMetric(new Gauge<Integer>() { - @Override - public Integer getValue() { - return 1; - } - }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm")); - - rep2.notifyOfAddedMetric(new Gauge<Integer>() { - @Override - public Integer getValue() { - return 2; - } - }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm")); - - MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - - ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents())); - ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents())); - - assertEquals(1, mBeanServer.getAttribute(objectName1, "Value")); - assertEquals(2, mBeanServer.getAttribute(objectName2, "Value")); - - rep1.close(); - rep2.close(); - reg.shutdown(); - } - - /** - * Verifies that we can connect to multiple JMXReporters running on the same machine. - * - * @throws Exception - */ - @Test - public void testJMXAvailability() throws Exception { - Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter.class.getName()); - MetricRegistry reg = new MetricRegistry(cfg); - - TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm"); - - JMXReporter rep1 = new JMXReporter(); - JMXReporter rep2 = new JMXReporter(); - - Configuration cfg1 = new Configuration(); - cfg1.setString("port", "9040-9055"); - rep1.open(cfg1); - rep2.open(cfg1); - - rep1.notifyOfAddedMetric(new Gauge<Integer>() { - @Override - public Integer getValue() { - return 1; - } - }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm")); - - rep2.notifyOfAddedMetric(new Gauge<Integer>() { - @Override - public Integer getValue() { - return 2; - } - }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm")); - - ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents())); - ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents())); - - JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + rep1.getPort() + "/jndi/rmi://localhost:" + rep1.getPort() + "/jmxrmi"); - JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1); - MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection(); - - assertEquals(1, mCon1.getAttribute(objectName1, "Value")); - assertEquals(2, mCon1.getAttribute(objectName2, "Value")); - - url1 = null; - jmxCon1.close(); - jmxCon1 = null; - mCon1 = null; - - JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + rep2.getPort() + "/jndi/rmi://localhost:" + rep2.getPort() + "/jmxrmi"); - JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2); - MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection(); - - assertEquals(1, mCon2.getAttribute(objectName1, "Value")); - assertEquals(2, mCon2.getAttribute(objectName2, "Value")); - - url2 = null; - jmxCon2.close(); - jmxCon2 = null; - mCon2 = null; - - rep1.close(); - rep2.close(); - reg.shutdown(); - } - - /** - * Tests that histograms are properly reported via the JMXReporter. - */ - @Test - public void testHistogramReporting() throws Exception { - MetricRegistry registry = null; - String histogramName = "histogram"; - - try { - Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.metrics.reporter.JMXReporter"); - - registry = new MetricRegistry(config); - - TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId"); - - TestingHistogram histogram = new TestingHistogram(); - - metricGroup.histogram(histogramName, histogram); - - MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - - ObjectName objectName = new ObjectName(JMXReporter.generateJmxName(histogramName, metricGroup.getScopeComponents())); - - MBeanInfo info = mBeanServer.getMBeanInfo(objectName); - - MBeanAttributeInfo[] attributeInfos = info.getAttributes(); - - assertEquals(11, attributeInfos.length); - - assertEquals(histogram.getCount(), mBeanServer.getAttribute(objectName, "Count")); - assertEquals(histogram.getStatistics().getMean(), mBeanServer.getAttribute(objectName, "Mean")); - assertEquals(histogram.getStatistics().getStdDev(), mBeanServer.getAttribute(objectName, "StdDev")); - assertEquals(histogram.getStatistics().getMax(), mBeanServer.getAttribute(objectName, "Max")); - assertEquals(histogram.getStatistics().getMin(), mBeanServer.getAttribute(objectName, "Min")); - assertEquals(histogram.getStatistics().getQuantile(0.5), mBeanServer.getAttribute(objectName, "Median")); - assertEquals(histogram.getStatistics().getQuantile(0.75), mBeanServer.getAttribute(objectName, "75thPercentile")); - assertEquals(histogram.getStatistics().getQuantile(0.95), mBeanServer.getAttribute(objectName, "95thPercentile")); - assertEquals(histogram.getStatistics().getQuantile(0.98), mBeanServer.getAttribute(objectName, "98thPercentile")); - assertEquals(histogram.getStatistics().getQuantile(0.99), mBeanServer.getAttribute(objectName, "99thPercentile")); - assertEquals(histogram.getStatistics().getQuantile(0.999), mBeanServer.getAttribute(objectName, "999thPercentile")); - - } finally { - if (registry != null) { - registry.shutdown(); - } - } - } - - static class TestingHistogram implements Histogram { - - @Override - public void update(long value) { - - } - - @Override - public long getCount() { - return 1; - } - - @Override - public HistogramStatistics getStatistics() { - return new HistogramStatistics() { - @Override - public double getQuantile(double quantile) { - return quantile; - } - - @Override - public long[] getValues() { - return new long[0]; - } - - @Override - public int size() { - return 3; - } - - @Override - public double getMean() { - return 4; - } - - @Override - public double getStdDev() { - return 5; - } - - @Override - public long getMax() { - return 6; - } - - @Override - public long getMin() { - return 7; - } - }; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java deleted file mode 100644 index 3f8909d..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.util; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.groups.AbstractMetricGroup; -import org.apache.flink.metrics.reporter.AbstractReporter; - -public class TestReporter extends AbstractReporter { - - @Override - public void open(Configuration config) {} - - @Override - public void close() {} - - @Override - public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {} - - @Override - public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) {} - - @Override - public String filterCharacters(String input) { - return input; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 2bc26a7..1a0e8d8 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -108,6 +108,13 @@ under the License. <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-jmx</artifactId> + <version>${project.version}</version> + </dependency> + + </dependencies> <!-- See main pom.xml for explanation of profiles --> http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/pom.xml b/flink-metrics/flink-metrics-core/pom.xml new file mode 100644 index 0000000..ab5eb56 --- /dev/null +++ b/flink-metrics/flink-metrics-core/pom.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-metrics-core</artifactId> + <name>flink-metrics-core</name> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java new file mode 100644 index 0000000..1e9fbc4 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics; + +/** + * Interface for a character filter function. The filter function is given a string which the filter + * can transform. The returned string is the transformation result. + */ +public interface CharacterFilter { + + /** + * Filter the given string and generate a resulting string from it. + * + * For example, one implementation could filter out invalid characters from the input string. + * + * @param input Input string + * @return Filtered result string + */ + String filterCharacters(String input); +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Counter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Counter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Counter.java new file mode 100644 index 0000000..7f63cf9 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Counter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics; + +/** + * A Counter is a {@link Metric} that measures a count. + */ +public interface Counter extends Metric { + + /** + * Increment the current count by 1. + */ + void inc(); + + /** + * Increment the current count by the given value. + * + * @param n value to increment the current count by + */ + void inc(long n); + + /** + * Decrement the current count by 1. + */ + void dec(); + + /** + * Decrement the current count by the given value. + * + * @param n value to decrement the current count by + */ + void dec(long n); + + /** + * Returns the current count. + * + * @return current count + */ + long getCount(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Gauge.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Gauge.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Gauge.java new file mode 100644 index 0000000..eb7c40f --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Gauge.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics; + +/** + * A Gauge is a {@link Metric} that calculates a specific value at a point in time. + */ +public interface Gauge<T> extends Metric { + + /** + * Calculates and returns the measured value. + * + * @return calculated value + */ + T getValue(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java new file mode 100644 index 0000000..af5c9b0 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics; + +/** + * Histogram interface to be used with Flink's metrics system. + * + * The histogram allows to record values, get the current count of recorded values and create + * histogram statistics for the currently seen elements. + */ +public interface Histogram extends Metric { + + /** + * Update the histogram with the given value. + * + * @param value Value to update the histogram with + */ + void update(long value); + + /** + * Get the count of seen elements. + * + * @return Count of seen elements + */ + long getCount(); + + /** + * Create statistics for the currently recorded elements. + * + * @return Statistics about the currently recorded elements + */ + HistogramStatistics getStatistics(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java new file mode 100644 index 0000000..b2e4507 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics; + +/** + * Histogram statistics represent the current snapshot of elements recorded in the histogram. + * + * The histogram statistics allow to calculate values for quantiles, the mean, the standard + * deviation, the minimum and the maximum. + */ +public abstract class HistogramStatistics { + + /** + * Returns the value for the given quantile based on the represented histogram statistics. + * + * @param quantile Quantile to calculate the value for + * @return Value for the given quantile + */ + public abstract double getQuantile(double quantile); + + /** + * Returns the elements of the statistics' sample + * + * @return Elements of the statistics' sample + */ + public abstract long[] getValues(); + + /** + * Returns the size of the statistics' sample + * + * @return Size of the statistics' sample + */ + public abstract int size(); + + /** + * Returns the mean of the histogram values. + * + * @return Mean of the histogram values + */ + public abstract double getMean(); + + /** + * Returns the standard deviation of the distribution reflected by the histogram statistics. + * + * @return Standard deviation of histogram distribution + */ + public abstract double getStdDev(); + + /** + * Returns the maximum value of the histogram. + * + * @return Maximum value of the histogram + */ + public abstract long getMax(); + + /** + * Returns the minimum value of the histogram. + * + * @return Minimum value of the histogram + */ + public abstract long getMin(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java new file mode 100644 index 0000000..5597d91 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics; + +/** + * Common super interface for all metrics. + */ +public interface Metric { +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java new file mode 100644 index 0000000..a611ae8 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +import java.util.Properties; + +public class MetricConfig extends Properties { + + public String getString(String key, String defaultValue) { + return getProperty(key, defaultValue); + } + + public int getInteger(String key, int defaultValue) { + String argument = getProperty(key, null); + return argument == null + ? defaultValue + : Integer.parseInt(argument); + } + + public long getLong(String key, long defaultValue) { + String argument = getProperty(key, null); + return argument == null + ? defaultValue + : Long.parseLong(argument); + } + + public float getFloat(String key, float defaultValue) { + String argument = getProperty(key, null); + return argument == null + ? defaultValue + : Float.parseFloat(argument); + } + + public double getDouble(String key, double defaultValue) { + String argument = getProperty(key, null); + return argument == null + ? defaultValue + : Double.parseDouble(argument); + } + + public boolean getBoolean(String key, boolean defaultValue) { + String argument = getProperty(key, null); + return argument == null + ? defaultValue + : Boolean.parseBoolean(argument); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java new file mode 100644 index 0000000..15f25b9 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics; + +/** + * A MetricGroup is a named container for {@link Metric Metrics} and further metric subgroups. + * + * <p>Instances of this class can be used to register new metrics with Flink and to create a nested + * hierarchy based on the group names. + * + * <p>A MetricGroup is uniquely identified by it's place in the hierarchy and name. + */ +public interface MetricGroup { + + // ------------------------------------------------------------------------ + // Metrics + // ------------------------------------------------------------------------ + + /** + * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink. + * + * @param name name of the counter + * @return the created counter + */ + Counter counter(int name); + + /** + * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink. + * + * @param name name of the counter + * @return the created counter + */ + Counter counter(String name); + + /** + * Registers a {@link org.apache.flink.metrics.Counter} with Flink. + * + * @param name name of the counter + * @param counter counter to register + * @param <C> counter type + * @return the given counter + */ + <C extends Counter> C counter(int name, C counter); + + /** + * Registers a {@link org.apache.flink.metrics.Counter} with Flink. + * + * @param name name of the counter + * @param counter counter to register + * @param <C> counter type + * @return the given counter + */ + <C extends Counter> C counter(String name, C counter); + + /** + * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink. + * + * @param name name of the gauge + * @param gauge gauge to register + * @param <T> return type of the gauge + * @return the given gauge + */ + <T, G extends Gauge<T>> G gauge(int name, G gauge); + + /** + * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink. + * + * @param name name of the gauge + * @param gauge gauge to register + * @param <T> return type of the gauge + * @return the given gauge + */ + <T, G extends Gauge<T>> G gauge(String name, G gauge); + + /** + * Registers a new {@link Histogram} with Flink. + * + * @param name name of the histogram + * @param histogram histogram to register + * @param <H> histogram type + * @return the registered histogram + */ + <H extends Histogram> H histogram(String name, H histogram); + + /** + * Registers a new {@link Histogram} with Flink. + * + * @param name name of the histogram + * @param histogram histogram to register + * @param <H> histogram type + * @return the registered histogram + */ + <H extends Histogram> H histogram(int name, H histogram); + + // ------------------------------------------------------------------------ + // Groups + // ------------------------------------------------------------------------ + + /** + * Creates a new MetricGroup and adds it to this groups sub-groups. + * + * @param name name of the group + * @return the created group + */ + MetricGroup addGroup(int name); + + /** + * Creates a new MetricGroup and adds it to this groups sub-groups. + * + * @param name name of the group + * @return the created group + */ + MetricGroup addGroup(String name); + + // ------------------------------------------------------------------------ + // Scope + // ------------------------------------------------------------------------ + + /** + * Gets the scope as an array of the scope components, for example + * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]} + * + * @see #getMetricIdentifier(String) + * @see #getMetricIdentifier(String, CharacterFilter) + */ + String[] getScopeComponents(); + + /** + * Returns the fully qualified metric name, for example + * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"} + * + * @param metricName metric name + * @return fully qualified metric name + */ + String getMetricIdentifier(String metricName); + + /** + * Returns the fully qualified metric name, for example + * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"} + * + * @param metricName metric name + * @param filter character filter which is applied to the scope components if not null. + * @return fully qualified metric name + */ + String getMetricIdentifier(String metricName, CharacterFilter filter); +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java new file mode 100644 index 0000000..6ec3b28 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +/** + * A simple low-overhead {@link org.apache.flink.metrics.Counter} that is not thread-safe. + */ +public class SimpleCounter implements Counter { + + /** the current count */ + private long count; + + /** + * Increment the current count by 1. + */ + @Override + public void inc() { + count++; + } + + /** + * Increment the current count by the given value. + * + * @param n value to increment the current count by + */ + @Override + public void inc(long n) { + count += n; + } + + /** + * Decrement the current count by 1. + */ + @Override + public void dec() { + count--; + } + + /** + * Decrement the current count by the given value. + * + * @param n value to decrement the current count by + */ + @Override + public void dec(long n) { + count -= n; + } + + /** + * Returns the current count. + * + * @return current count + */ + @Override + public long getCount() { + return count; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java new file mode 100644 index 0000000..1c7f0ce --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.groups; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; + +/** + * A special {@link MetricGroup} that does not register any metrics at the metrics registry + * and any reporters. + */ +public class UnregisteredMetricsGroup implements MetricGroup { + + @Override + public Counter counter(int name) { + return new SimpleCounter(); + } + + @Override + public Counter counter(String name) { + return new SimpleCounter(); + } + + @Override + public <C extends Counter> C counter(int name, C counter) { + return counter; + } + + @Override + public <C extends Counter> C counter(String name, C counter) { + return counter; + } + + @Override + public <T, G extends Gauge<T>> G gauge(int name, G gauge) { + return gauge; + } + + @Override + public <T, G extends Gauge<T>> G gauge(String name, G gauge) { + return gauge; + } + + @Override + public <H extends Histogram> H histogram(int name, H histogram) { + return histogram; + } + + @Override + public <H extends Histogram> H histogram(String name, H histogram) { + return histogram; + } + + @Override + public MetricGroup addGroup(int name) { + return addGroup(String.valueOf(name)); + } + + @Override + public MetricGroup addGroup(String name) { + return new UnregisteredMetricsGroup(); + } + + @Override + public String[] getScopeComponents() { + return new String[0]; + } + + @Override + public String getMetricIdentifier(String metricName) { + return metricName; + } + + @Override + public String getMetricIdentifier(String metricName, CharacterFilter filter) { + return metricName; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java new file mode 100644 index 0000000..7ab8c73 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.reporter; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Base interface for custom metric reporters. + */ +public abstract class AbstractReporter implements MetricReporter, CharacterFilter { + protected final Logger log = LoggerFactory.getLogger(getClass()); + + protected final Map<Gauge<?>, String> gauges = new HashMap<>(); + protected final Map<Counter, String> counters = new HashMap<>(); + protected final Map<Histogram, String> histograms = new HashMap<>(); + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName, this); + + synchronized (this) { + if (metric instanceof Counter) { + counters.put((Counter) metric, name); + } else if (metric instanceof Gauge) { + gauges.put((Gauge<?>) metric, name); + } else if (metric instanceof Histogram) { + histograms.put((Histogram) metric, name); + } else { + log.warn("Cannot add unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + synchronized (this) { + if (metric instanceof Counter) { + counters.remove(metric); + } else if (metric instanceof Gauge) { + gauges.remove(metric); + } else if (metric instanceof Histogram) { + histograms.remove(metric); + } else { + log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java new file mode 100644 index 0000000..bd13be7 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.reporter; + +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; + +/** + * Reporters are used to export {@link Metric Metrics} to an external backend. + * + * <p>Reporters are instantiated via reflection and must be public, non-abstract, and have a + * public no-argument constructor. + */ +public interface MetricReporter { + + // ------------------------------------------------------------------------ + // life cycle + // ------------------------------------------------------------------------ + + /** + * Configures this reporter. Since reporters are instantiated generically and hence parameter-less, + * this method is the place where the reporters set their basic fields based on configuration values. + * + * <p>This method is always called first on a newly instantiated reporter. + * + * @param config The configuration with all parameters. + */ + void open(MetricConfig config); + + /** + * Closes this reporter. Should be used to close channels, streams and release resources. + */ + void close(); + + // ------------------------------------------------------------------------ + // adding / removing metrics + // ------------------------------------------------------------------------ + + /** + * Called when a new {@link Metric} was added. + * + * @param metric the metric that was added + * @param metricName the name of the metric + * @param group the group that contains the metric + */ + void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group); + + /** + * Called when a {@link Metric} was should be removed. + * + * @param metric the metric that should be removed + * @param metricName the name of the metric + * @param group the group that contains the metric + */ + void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group); +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java new file mode 100644 index 0000000..f82f5a6 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.reporter; + +/** + * Interface for reporters that actively send out data periodically. + */ +public interface Scheduled { + + /** + * Report the current measurements. This method is called periodically by the + * metrics registry that uses the reporter. + */ + void report(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-dropwizard/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/pom.xml b/flink-metrics/flink-metrics-dropwizard/pom.xml index 3d3ecb7..e63af7d 100644 --- a/flink-metrics/flink-metrics-dropwizard/pom.xml +++ b/flink-metrics/flink-metrics-dropwizard/pom.xml @@ -35,7 +35,14 @@ under the License. <dependencies> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> + <artifactId>flink-annotations</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-core</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> @@ -51,6 +58,13 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils-junit</artifactId> <version>${project.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java index 568bda2..ce0299b 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java @@ -23,7 +23,6 @@ import com.codahale.metrics.Reporter; import com.codahale.metrics.ScheduledReporter; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper; import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper; import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper; @@ -33,7 +32,8 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.groups.AbstractMetricGroup; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; import org.slf4j.Logger; @@ -88,7 +88,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch // ------------------------------------------------------------------------ @Override - public void open(Configuration config) { + public void open(MetricConfig config) { this.reporter = getReporter(config); } @@ -102,7 +102,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch // ------------------------------------------------------------------------ @Override - public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { final String fullName = group.getMetricIdentifier(metricName, this); synchronized (this) { @@ -130,7 +130,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch } @Override - public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) { + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { synchronized (this) { String fullName; @@ -200,5 +200,5 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch this.reporter.report(gauges, counters, histograms, meters, timers); } - public abstract ScheduledReporter getReporter(Configuration config); + public abstract ScheduledReporter getReporter(MetricConfig config); } http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java index 4b19b50..9b04c3b 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java @@ -23,17 +23,18 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.SimpleCounter; -import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup; -import org.apache.flink.metrics.groups.TaskManagerMetricGroup; -import org.apache.flink.metrics.groups.TaskMetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.util.AbstractID; + import org.junit.Test; import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -42,10 +43,10 @@ import static org.junit.Assert.assertTrue; public class ScheduledDropwizardReporterTest { @Test - public void testInvalidCharacterReplacement() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + public void testInvalidCharacterReplacement() { ScheduledDropwizardReporter reporter = new ScheduledDropwizardReporter() { @Override - public ScheduledReporter getReporter(Configuration config) { + public ScheduledReporter getReporter(MetricConfig config) { return null; } }; @@ -112,7 +113,7 @@ public class ScheduledDropwizardReporterTest { public static class TestingScheduledDropwizardReporter extends ScheduledDropwizardReporter { @Override - public ScheduledReporter getReporter(Configuration config) { + public ScheduledReporter getReporter(MetricConfig config) { return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java index c1913d7..7b27867 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java @@ -29,9 +29,10 @@ import com.codahale.metrics.Timer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.dropwizard.ScheduledDropwizardReporter; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -153,7 +154,7 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger { TestingScheduledReporter scheduledReporter = null; @Override - public ScheduledReporter getReporter(Configuration config) { + public ScheduledReporter getReporter(MetricConfig config) { scheduledReporter = new TestingScheduledReporter( registry, getClass().getName(), http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-ganglia/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-ganglia/pom.xml b/flink-metrics/flink-metrics-ganglia/pom.xml index e4993ad..1a94851 100644 --- a/flink-metrics/flink-metrics-ganglia/pom.xml +++ b/flink-metrics/flink-metrics-ganglia/pom.xml @@ -42,7 +42,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> + <artifactId>flink-metrics-core</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java index f30c9f4..15176a3 100644 --- a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java +++ b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java @@ -23,8 +23,8 @@ import com.codahale.metrics.ScheduledReporter; import info.ganglia.gmetric4j.gmetric.GMetric; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; import org.apache.flink.dropwizard.ScheduledDropwizardReporter; +import org.apache.flink.metrics.MetricConfig; import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -38,7 +38,7 @@ public class GangliaReporter extends ScheduledDropwizardReporter { public static final String ARG_MODE_ADDRESSING = "addressingMode"; @Override - public ScheduledReporter getReporter(Configuration config) { + public ScheduledReporter getReporter(MetricConfig config) { try { String host = config.getString(ARG_HOST, null); http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-graphite/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-graphite/pom.xml b/flink-metrics/flink-metrics-graphite/pom.xml index 44dc619..b75fbae 100644 --- a/flink-metrics/flink-metrics-graphite/pom.xml +++ b/flink-metrics/flink-metrics-graphite/pom.xml @@ -42,7 +42,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> + <artifactId>flink-metrics-core</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java index 16be830..ca301aa 100644 --- a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java +++ b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java @@ -22,8 +22,8 @@ import com.codahale.metrics.ScheduledReporter; import com.codahale.metrics.graphite.Graphite; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; import org.apache.flink.dropwizard.ScheduledDropwizardReporter; +import org.apache.flink.metrics.MetricConfig; import java.util.concurrent.TimeUnit; @@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit; public class GraphiteReporter extends ScheduledDropwizardReporter { @Override - public ScheduledReporter getReporter(Configuration config) { + public ScheduledReporter getReporter(MetricConfig config) { String host = config.getString(ARG_HOST, null); int port = config.getInteger(ARG_PORT, -1); http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-jmx/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/pom.xml b/flink-metrics/flink-metrics-jmx/pom.xml new file mode 100644 index 0000000..45ffbf8 --- /dev/null +++ b/flink-metrics/flink-metrics-jmx/pom.xml @@ -0,0 +1,81 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-metrics-jmx</artifactId> + <name>flink-metrics-jmx</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-annotations</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java new file mode 100644 index 0000000..1a283d9 --- /dev/null +++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.jmx; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; + +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.InstanceAlreadyExistsException; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXConnectorServerFactory; +import javax.management.remote.JMXServiceURL; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.MalformedURLException; +import java.rmi.NoSuchObjectException; +import java.rmi.registry.LocateRegistry; +import java.rmi.registry.Registry; +import java.rmi.server.UnicastRemoteObject; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MetricReporter} that exports {@link Metric Metrics} via JMX. + * + * Largely based on the JmxReporter class of the dropwizard metrics library + * https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java + */ +public class JMXReporter implements MetricReporter { + + private static final String PREFIX = "org.apache.flink.metrics:"; + private static final String KEY_PREFIX = "key"; + + public static final String ARG_PORT = "port"; + + private static final Logger LOG = LoggerFactory.getLogger(JMXReporter.class); + + // ------------------------------------------------------------------------ + + /** The server where the management beans are registered and deregistered */ + private final MBeanServer mBeanServer; + + /** The names under which the registered metrics have been added to the MBeanServer */ + private final Map<Metric, ObjectName> registeredMetrics; + + /** The server to which JMX clients connect to. ALlows for better control over port usage. */ + private JMXServer jmxServer; + + /** + * Creates a new JMXReporter + */ + public JMXReporter() { + this.mBeanServer = ManagementFactory.getPlatformMBeanServer(); + this.registeredMetrics = new HashMap<>(); + } + + // ------------------------------------------------------------------------ + // life cycle + // ------------------------------------------------------------------------ + + @Override + public void open(MetricConfig config) { + String portsConfig = config.getString(ARG_PORT, null); + + if (portsConfig != null) { + Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig); + + JMXServer server = new JMXServer(); + while (ports.hasNext()) { + int port = ports.next(); + try { + server.start(port); + LOG.info("Started JMX server on port " + port + "."); + // only set our field if the server was actually started + jmxServer = server; + break; + } catch (IOException ioe) { //assume port conflict + LOG.debug("Could not start JMX server on port " + port + ".", ioe); + try { + server.stop(); + } catch (Exception e) { + LOG.debug("Could not stop JMX server.", e); + } + } + } + if (jmxServer == null) { + throw new RuntimeException("Could not start JMX server on any configured port. Ports: " + portsConfig); + } + } + } + + @Override + public void close() { + if (jmxServer != null) { + try { + jmxServer.stop(); + } catch (IOException e) { + LOG.error("Failed to stop JMX server.", e); + } + } + } + + public int getPort() { + if (jmxServer == null) { + throw new NullPointerException("No server was opened. Did you specify a port?"); + } + return jmxServer.port; + } + + // ------------------------------------------------------------------------ + // adding / removing metrics + // ------------------------------------------------------------------------ + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = generateJmxName(metricName, group.getScopeComponents()); + + AbstractBean jmxMetric; + ObjectName jmxName; + try { + jmxName = new ObjectName(name); + } catch (MalformedObjectNameException e) { + LOG.error("Metric name did not conform to JMX ObjectName rules: " + name, e); + return; + } + + if (metric instanceof Gauge) { + jmxMetric = new JmxGauge((Gauge<?>) metric); + } else if (metric instanceof Counter) { + jmxMetric = new JmxCounter((Counter) metric); + } else if (metric instanceof Histogram) { + jmxMetric = new JmxHistogram((Histogram) metric); + } else { + LOG.error("Cannot add unknown metric type: {}. This indicates that the metric type " + + "is not supported by this reporter.", metric.getClass().getName()); + return; + } + + try { + synchronized (this) { + mBeanServer.registerMBean(jmxMetric, jmxName); + registeredMetrics.put(metric, jmxName); + } + } catch (NotCompliantMBeanException e) { + // implementation error on our side + LOG.error("Metric did not comply with JMX MBean naming rules.", e); + } catch (InstanceAlreadyExistsException e) { + LOG.debug("A metric with the name " + jmxName + " was already registered.", e); + LOG.error("A metric with the name " + jmxName + " was already registered."); + } catch (Throwable t) { + LOG.error("Failed to register metric", t); + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + try { + synchronized (this) { + final ObjectName jmxName = registeredMetrics.remove(metric); + + // remove the metric if it is known. if it is not known, ignore the request + if (jmxName != null) { + mBeanServer.unregisterMBean(jmxName); + } + } + } catch (InstanceNotFoundException e) { + // alright then + } catch (Throwable t) { + // never propagate exceptions - the metrics reporter should not affect the stability + // of the running system + LOG.error("Un-registering metric failed", t); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + static String generateJmxName(String metricName, String[] scopeComponents) { + final StringBuilder nameBuilder = new StringBuilder(128); + nameBuilder.append(PREFIX); + + for (int x = 0; x < scopeComponents.length; x++) { + // write keyX= + nameBuilder.append(KEY_PREFIX); + nameBuilder.append(x); + nameBuilder.append("="); + + // write scope component + nameBuilder.append(replaceInvalidChars(scopeComponents[x])); + nameBuilder.append(","); + } + + // write the name + nameBuilder.append("name=").append(replaceInvalidChars(metricName)); + + return nameBuilder.toString(); + } + + /** + * Lightweight method to replace unsupported characters. + * If the string does not contain any unsupported characters, this method creates no + * new string (and in fact no new objects at all). + * + * <p>Replacements: + * + * <ul> + * <li>{@code "} is removed</li> + * <li>{@code space} is replaced by {@code _} (underscore)</li> + * <li>{@code , = ; : ? ' *} are replaced by {@code -} (hyphen)</li> + * </ul> + */ + static String replaceInvalidChars(String str) { + char[] chars = null; + final int strLen = str.length(); + int pos = 0; + + for (int i = 0; i < strLen; i++) { + final char c = str.charAt(i); + switch (c) { + case '"': + // remove character by not moving cursor + if (chars == null) { + chars = str.toCharArray(); + } + break; + + case ' ': + if (chars == null) { + chars = str.toCharArray(); + } + chars[pos++] = '_'; + break; + + case ',': + case '=': + case ';': + case ':': + case '?': + case '\'': + case '*': + if (chars == null) { + chars = str.toCharArray(); + } + chars[pos++] = '-'; + break; + + default: + if (chars != null) { + chars[pos] = c; + } + pos++; + } + } + + return chars == null ? str : new String(chars, 0, pos); + } + + // ------------------------------------------------------------------------ + // Interfaces and base classes for JMX beans + // ------------------------------------------------------------------------ + + public interface MetricMBean {} + + private abstract static class AbstractBean implements MetricMBean {} + + public interface JmxCounterMBean extends MetricMBean { + long getCount(); + } + + private static class JmxCounter extends AbstractBean implements JmxCounterMBean { + private Counter counter; + + JmxCounter(Counter counter) { + this.counter = counter; + } + + @Override + public long getCount() { + return counter.getCount(); + } + } + + public interface JmxGaugeMBean extends MetricMBean { + Object getValue(); + } + + private static class JmxGauge extends AbstractBean implements JmxGaugeMBean { + + private final Gauge<?> gauge; + + JmxGauge(Gauge<?> gauge) { + this.gauge = gauge; + } + + @Override + public Object getValue() { + return gauge.getValue(); + } + } + + public interface JmxHistogramMBean extends MetricMBean { + long getCount(); + + double getMean(); + + double getStdDev(); + + long getMax(); + + long getMin(); + + double getMedian(); + + double get75thPercentile(); + + double get95thPercentile(); + + double get98thPercentile(); + + double get99thPercentile(); + + double get999thPercentile(); + } + + private static class JmxHistogram extends AbstractBean implements JmxHistogramMBean { + + private final Histogram histogram; + + JmxHistogram(Histogram histogram) { + this.histogram = histogram; + } + + @Override + public long getCount() { + return histogram.getCount(); + } + + @Override + public double getMean() { + return histogram.getStatistics().getMean(); + } + + @Override + public double getStdDev() { + return histogram.getStatistics().getStdDev(); + } + + @Override + public long getMax() { + return histogram.getStatistics().getMax(); + } + + @Override + public long getMin() { + return histogram.getStatistics().getMin(); + } + + @Override + public double getMedian() { + return histogram.getStatistics().getQuantile(0.5); + } + + @Override + public double get75thPercentile() { + return histogram.getStatistics().getQuantile(0.75); + } + + @Override + public double get95thPercentile() { + return histogram.getStatistics().getQuantile(0.95); + } + + @Override + public double get98thPercentile() { + return histogram.getStatistics().getQuantile(0.98); + } + + @Override + public double get99thPercentile() { + return histogram.getStatistics().getQuantile(0.99); + } + + @Override + public double get999thPercentile() { + return histogram.getStatistics().getQuantile(0.999); + } + } + + /** + * JMX Server implementation that JMX clients can connect to. + * + * Heavily based on j256 simplejmx project + * + * https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java + */ + private static class JMXServer { + private Registry rmiRegistry; + private JMXConnectorServer connector; + private int port; + + public void start(int port) throws IOException { + if (rmiRegistry != null && connector != null) { + LOG.debug("JMXServer is already running."); + return; + } + startRmiRegistry(port); + startJmxService(port); + this.port = port; + } + + /** + * Starts an RMI Registry that allows clients to lookup the JMX IP/port. + * + * @param port rmi port to use + * @throws IOException + */ + private void startRmiRegistry(int port) throws IOException { + rmiRegistry = LocateRegistry.createRegistry(port); + } + + /** + * Starts a JMX connector that allows (un)registering MBeans with the MBean server and RMI invocations. + * + * @param port jmx port to use + * @throws IOException + */ + private void startJmxService(int port) throws IOException { + String serviceUrl = "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi"; + JMXServiceURL url; + try { + url = new JMXServiceURL(serviceUrl); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e); + } + + connector = JMXConnectorServerFactory.newJMXConnectorServer(url, null, ManagementFactory.getPlatformMBeanServer()); + + connector.start(); + } + + public void stop() throws IOException { + if (connector != null) { + try { + connector.stop(); + } finally { + connector = null; + } + } + if (rmiRegistry != null) { + try { + UnicastRemoteObject.unexportObject(rmiRegistry, true); + } catch (NoSuchObjectException e) { + throw new IOException("Could not un-export our RMI registry", e); + } finally { + rmiRegistry = null; + } + } + } + } +}
