[FLINK-3951] Add Histogram metric type This closes #2112
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee3c7a88 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee3c7a88 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee3c7a88 Branch: refs/heads/master Commit: ee3c7a88bb74232e4884899699aaa08ae2b6e038 Parents: d43bf8d Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue Jun 14 19:04:43 2016 +0200 Committer: zentol <ches...@apache.org> Committed: Mon Jun 27 15:32:03 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/metrics/Histogram.java | 52 +++ .../flink/metrics/HistogramStatistics.java | 81 +++++ .../org/apache/flink/metrics/MetricGroup.java | 20 ++ .../metrics/groups/AbstractMetricGroup.java | 12 + .../groups/UnregisteredMetricsGroup.java | 12 +- .../metrics/reporter/AbstractReporter.java | 15 + .../flink/metrics/reporter/JMXReporter.java | 98 +++++- .../flink/metrics/MetricRegistryTest.java | 3 +- .../groups/MetricGroupRegistrationTest.java | 21 ++ .../flink/metrics/reporter/JMXReporterTest.java | 108 ++++++- .../flink-metrics-dropwizard/pom.xml | 72 ----- .../dropwizard/ScheduledDropwizardReporter.java | 140 -------- .../dropwizard/metrics/CounterWrapper.java | 33 -- .../flink/dropwizard/metrics/GaugeWrapper.java | 41 --- .../flink-metrics-ganglia/pom.xml | 90 ------ .../flink/metrics/ganglia/GangliaReporter.java | 79 ----- .../flink-metrics-graphite/pom.xml | 84 ----- .../metrics/graphite/GraphiteReporter.java | 63 ---- .../flink-metrics-statsd/pom.xml | 43 --- .../flink/metrics/statsd/StatsDReporter.java | 143 --------- flink-metric-reporters/pom.xml | 42 --- flink-metrics/flink-metrics-dropwizard/pom.xml | 80 +++++ .../dropwizard/ScheduledDropwizardReporter.java | 162 ++++++++++ .../metrics/DropwizardHistogramStatistics.java | 70 ++++ .../metrics/DropwizardHistogramWrapper.java | 53 +++ .../dropwizard/metrics/FlinkCounterWrapper.java | 33 ++ .../dropwizard/metrics/FlinkGaugeWrapper.java | 41 +++ .../metrics/FlinkHistogramWrapper.java | 52 +++ .../metrics/HistogramStatisticsWrapper.java | 86 +++++ .../DropwizardFlinkHistogramWrapperTest.java | 319 +++++++++++++++++++ .../src/test/resources/log4j-test.properties | 27 ++ .../src/test/resources/logback-test.xml | 34 ++ flink-metrics/flink-metrics-ganglia/pom.xml | 90 ++++++ .../flink/metrics/ganglia/GangliaReporter.java | 79 +++++ flink-metrics/flink-metrics-graphite/pom.xml | 84 +++++ .../metrics/graphite/GraphiteReporter.java | 63 ++++ flink-metrics/flink-metrics-statsd/pom.xml | 51 +++ .../flink/metrics/statsd/StatsDReporter.java | 184 +++++++++++ .../metrics/statsd/StatsDReporterTest.java | 236 ++++++++++++++ .../src/test/resources/log4j-test.properties | 27 ++ .../src/test/resources/logback-test.xml | 34 ++ flink-metrics/pom.xml | 42 +++ pom.xml | 2 +- 43 files changed, 2264 insertions(+), 837 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java b/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java new file mode 100644 index 0000000..3fd1253 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Histogram interface to be used with Flink's metrics system. + * + * The histogram allows to record values, get the current count of recorded values and create + * histogram statistics for the currently seen elements. + */ +@PublicEvolving +public interface Histogram extends Metric { + + /** + * Update the histogram with the given value. + * + * @param value Value to update the histogram with + */ + void update(long value); + + /** + * Get the count of seen elements. + * + * @return Count of seen elements + */ + long getCount(); + + /** + * Create statistics for the currently recorded elements. + * + * @return Statistics about the currently recorded elements + */ + HistogramStatistics getStatistics(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java b/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java new file mode 100644 index 0000000..476580c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Histogram statistics represent the current snapshot of elements recorded in the histogram. + * + * The histogram statistics allow to calculate values for quantiles, the mean, the standard + * deviation, the minimum and the maximum. + */ +@PublicEvolving +public abstract class HistogramStatistics { + + /** + * Returns the value for the given quantile based on the represented histogram statistics. + * + * @param quantile Quantile to calculate the value for + * @return Value for the given quantile + */ + public abstract double getQuantile(double quantile); + + /** + * Returns the elements of the statistics' sample + * + * @return Elements of the statistics' sample + */ + public abstract long[] getValues(); + + /** + * Returns the size of the statistics' sample + * + * @return Size of the statistics' sample + */ + public abstract int size(); + + /** + * Returns the mean of the histogram values. + * + * @return Mean of the histogram values + */ + public abstract double getMean(); + + /** + * Returns the standard deviation of the distribution reflected by the histogram statistics. + * + * @return Standard deviation of histogram distribution + */ + public abstract double getStdDev(); + + /** + * Returns the maximum value of the histogram. + * + * @return Maximum value of the histogram + */ + public abstract long getMax(); + + /** + * Returns the minimum value of the histogram. + * + * @return Minimum value of the histogram + */ + public abstract long getMin(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java index b131949..f46d3fc 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java @@ -115,6 +115,26 @@ public interface MetricGroup { */ <T, G extends Gauge<T>> G gauge(String name, G gauge); + /** + * Registers a new {@link Histogram} with Flink. + * + * @param name name of the histogram + * @param histogram histogram to register + * @param <H> histogram type + * @return the registered histogram + */ + <H extends Histogram> H histogram(String name, H histogram); + + /** + * Registers a new {@link Histogram} with Flink. + * + * @param name name of the histogram + * @param histogram histogram to register + * @param <H> histogram type + * @return the registered histogram + */ + <H extends Histogram> H histogram(int name, H histogram); + // ------------------------------------------------------------------------ // Groups // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java index 93eb734..112957e 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java @@ -21,6 +21,7 @@ package org.apache.flink.metrics.groups; import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricRegistry; @@ -172,6 +173,17 @@ public abstract class AbstractMetricGroup implements MetricGroup { return gauge; } + @Override + public <H extends Histogram> H histogram(int name, H histogram) { + return histogram(String.valueOf(name), histogram); + } + + @Override + public <H extends Histogram> H histogram(String name, H histogram) { + addMetric(name, histogram); + return histogram; + } + /** * Adds the given metric to the group and registers it at the registry, if the group * is not yet closed, and if no metric with the same name has been registered before. http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java index 29d71d9..8e183df 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java @@ -21,6 +21,7 @@ package org.apache.flink.metrics.groups; import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; @@ -71,7 +72,16 @@ public class UnregisteredMetricsGroup implements MetricGroup { return gauge; } - + @Override + public <H extends Histogram> H histogram(int name, H histogram) { + return histogram; + } + + @Override + public <H extends Histogram> H histogram(String name, H histogram) { + return histogram; + } + @Override public MetricGroup addGroup(int name) { return addGroup(String.valueOf(name)); http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java index f2e78bf..8dacb7c 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java @@ -21,8 +21,11 @@ package org.apache.flink.metrics.reporter; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.groups.AbstractMetricGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -32,9 +35,11 @@ import java.util.Map; */ @PublicEvolving public abstract class AbstractReporter implements MetricReporter { + protected final Logger log = LoggerFactory.getLogger(getClass()); protected final Map<Gauge<?>, String> gauges = new HashMap<>(); protected final Map<Counter, String> counters = new HashMap<>(); + protected final Map<Histogram, String> histograms = new HashMap<>(); @Override public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { @@ -45,6 +50,11 @@ public abstract class AbstractReporter implements MetricReporter { counters.put((Counter) metric, name); } else if (metric instanceof Gauge) { gauges.put((Gauge<?>) metric, name); + } else if (metric instanceof Histogram) { + histograms.put((Histogram) metric, name); + } else { + log.warn("Cannot add unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); } } } @@ -56,6 +66,11 @@ public abstract class AbstractReporter implements MetricReporter { counters.remove(metric); } else if (metric instanceof Gauge) { gauges.remove(metric); + } else if (metric instanceof Histogram) { + histograms.remove(metric); + } else { + log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java index 326d6d7..eaf0ea0 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.groups.AbstractMetricGroup; import org.apache.flink.util.NetUtils; @@ -146,8 +147,11 @@ public class JMXReporter implements MetricReporter { jmxMetric = new JmxGauge((Gauge<?>) metric); } else if (metric instanceof Counter) { jmxMetric = new JmxCounter((Counter) metric); + } else if (metric instanceof Histogram) { + jmxMetric = new JmxHistogram((Histogram) metric); } else { - LOG.error("Unknown metric type: " + metric.getClass().getName()); + LOG.error("Cannot add unknown metric type: {}. This indicates that the metric type " + + "is not supported by this reporter.", metric.getClass().getName()); return; } @@ -285,7 +289,7 @@ public class JMXReporter implements MetricReporter { private static class JmxCounter extends AbstractBean implements JmxCounterMBean { private Counter counter; - public JmxCounter(Counter counter) { + JmxCounter(Counter counter) { this.counter = counter; } @@ -303,7 +307,7 @@ public class JMXReporter implements MetricReporter { private final Gauge<?> gauge; - public JmxGauge(Gauge<?> gauge) { + JmxGauge(Gauge<?> gauge) { this.gauge = gauge; } @@ -313,6 +317,94 @@ public class JMXReporter implements MetricReporter { } } + public interface JmxHistogramMBean extends MetricMBean { + long getCount(); + + double getMean(); + + double getStdDev(); + + long getMax(); + + long getMin(); + + double getMedian(); + + double get75thPercentile(); + + double get95thPercentile(); + + double get98thPercentile(); + + double get99thPercentile(); + + double get999thPercentile(); + } + + private static class JmxHistogram extends AbstractBean implements JmxHistogramMBean { + + private final Histogram histogram; + + JmxHistogram(Histogram histogram) { + this.histogram = histogram; + } + + @Override + public long getCount() { + return histogram.getCount(); + } + + @Override + public double getMean() { + return histogram.getStatistics().getMean(); + } + + @Override + public double getStdDev() { + return histogram.getStatistics().getStdDev(); + } + + @Override + public long getMax() { + return histogram.getStatistics().getMax(); + } + + @Override + public long getMin() { + return histogram.getStatistics().getMin(); + } + + @Override + public double getMedian() { + return histogram.getStatistics().getQuantile(0.5); + } + + @Override + public double get75thPercentile() { + return histogram.getStatistics().getQuantile(0.75); + } + + @Override + public double get95thPercentile() { + return histogram.getStatistics().getQuantile(0.95); + } + + @Override + public double get98thPercentile() { + return histogram.getStatistics().getQuantile(0.98); + } + + @Override + public double get99thPercentile() { + return histogram.getStatistics().getQuantile(0.99); + } + + @Override + public double get999thPercentile() { + return histogram.getStatistics().getQuantile(0.999); + } + } + /** * JMX Server implementation that JMX clients can connect to. * http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java index f8e0bf5..8b71816 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java @@ -25,12 +25,13 @@ import org.apache.flink.metrics.groups.scope.ScopeFormats; import org.apache.flink.metrics.reporter.Scheduled; import org.apache.flink.metrics.util.TestReporter; +import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.*; -public class MetricRegistryTest { +public class MetricRegistryTest extends TestLogger { /** * Verifies that the reporter class argument is correctly used to instantiate and open the reporter. http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java index 7b35d91..c7a112a 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java @@ -20,6 +20,8 @@ package org.apache.flink.metrics.groups; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricRegistry; @@ -57,6 +59,25 @@ public class MetricGroupRegistrationTest { Assert.assertEquals(gauge, TestReporter1.lastPassedMetric); assertEquals("gauge", TestReporter1.lastPassedName); + Histogram histogram = root.histogram("histogram", new Histogram() { + @Override + public void update(long value) { + + } + + @Override + public long getCount() { + return 0; + } + + @Override + public HistogramStatistics getStatistics() { + return null; + } + }); + + Assert.assertEquals(histogram, TestReporter1.lastPassedMetric); + assertEquals("histogram", TestReporter1.lastPassedName); registry.shutdown(); } http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java index d25f744..9e638a7 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java @@ -20,11 +20,16 @@ package org.apache.flink.metrics.reporter; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.MetricRegistry; import org.apache.flink.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.metrics.util.TestReporter; +import org.apache.flink.util.TestLogger; import org.junit.Test; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; import javax.management.MBeanServer; import javax.management.MBeanServerConnection; import javax.management.ObjectName; @@ -37,7 +42,7 @@ import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_JMX_PORT; import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_CLASS; import static org.junit.Assert.assertEquals; -public class JMXReporterTest { +public class JMXReporterTest extends TestLogger { @Test public void testReplaceInvalidChars() { @@ -188,4 +193,105 @@ public class JMXReporterTest { rep2.close(); reg.shutdown(); } + + /** + * Tests that histograms are properly reported via the JMXReporter. + */ + @Test + public void testHistogramReporting() throws Exception { + MetricRegistry registry = null; + String histogramName = "histogram"; + + try { + Configuration config = new Configuration(); + + registry = new MetricRegistry(config); + + TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId"); + + TestingHistogram histogram = new TestingHistogram(); + + metricGroup.histogram(histogramName, histogram); + + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + ObjectName objectName = new ObjectName(JMXReporter.generateJmxName(histogramName, metricGroup.getScopeComponents())); + + MBeanInfo info = mBeanServer.getMBeanInfo(objectName); + + MBeanAttributeInfo[] attributeInfos = info.getAttributes(); + + assertEquals(11, attributeInfos.length); + + assertEquals(histogram.getCount(), mBeanServer.getAttribute(objectName, "Count")); + assertEquals(histogram.getStatistics().getMean(), mBeanServer.getAttribute(objectName, "Mean")); + assertEquals(histogram.getStatistics().getStdDev(), mBeanServer.getAttribute(objectName, "StdDev")); + assertEquals(histogram.getStatistics().getMax(), mBeanServer.getAttribute(objectName, "Max")); + assertEquals(histogram.getStatistics().getMin(), mBeanServer.getAttribute(objectName, "Min")); + assertEquals(histogram.getStatistics().getQuantile(0.5), mBeanServer.getAttribute(objectName, "Median")); + assertEquals(histogram.getStatistics().getQuantile(0.75), mBeanServer.getAttribute(objectName, "75thPercentile")); + assertEquals(histogram.getStatistics().getQuantile(0.95), mBeanServer.getAttribute(objectName, "95thPercentile")); + assertEquals(histogram.getStatistics().getQuantile(0.98), mBeanServer.getAttribute(objectName, "98thPercentile")); + assertEquals(histogram.getStatistics().getQuantile(0.99), mBeanServer.getAttribute(objectName, "99thPercentile")); + assertEquals(histogram.getStatistics().getQuantile(0.999), mBeanServer.getAttribute(objectName, "999thPercentile")); + + } finally { + if (registry != null) { + registry.shutdown(); + } + } + } + + static class TestingHistogram implements Histogram { + + @Override + public void update(long value) { + + } + + @Override + public long getCount() { + return 1; + } + + @Override + public HistogramStatistics getStatistics() { + return new HistogramStatistics() { + @Override + public double getQuantile(double quantile) { + return quantile; + } + + @Override + public long[] getValues() { + return new long[0]; + } + + @Override + public int size() { + return 3; + } + + @Override + public double getMean() { + return 4; + } + + @Override + public double getStdDev() { + return 5; + } + + @Override + public long getMax() { + return 6; + } + + @Override + public long getMin() { + return 7; + } + }; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-dropwizard/pom.xml b/flink-metric-reporters/flink-metrics-dropwizard/pom.xml deleted file mode 100644 index a386880..0000000 --- a/flink-metric-reporters/flink-metrics-dropwizard/pom.xml +++ /dev/null @@ -1,72 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metric-reporters</artifactId> - <version>1.1-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-metrics-dropwizard</artifactId> - <name>flink-metrics-dropwizard</name> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>${metrics.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.4</version> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - </configuration> - <executions> - <execution> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java deleted file mode 100644 index d67f3e3..0000000 --- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.dropwizard; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Reporter; -import com.codahale.metrics.ScheduledReporter; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.dropwizard.metrics.CounterWrapper; -import org.apache.flink.dropwizard.metrics.GaugeWrapper; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.groups.AbstractMetricGroup; -import org.apache.flink.metrics.reporter.MetricReporter; -import org.apache.flink.metrics.reporter.Scheduled; - -import java.util.HashMap; -import java.util.Map; -import java.util.SortedMap; - -/** - * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} that wraps a - * Dropwizard {@link com.codahale.metrics.Reporter}. - */ -@PublicEvolving -public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled, Reporter { - - public static final String ARG_HOST = "host"; - public static final String ARG_PORT = "port"; - public static final String ARG_PREFIX = "prefix"; - public static final String ARG_CONVERSION_RATE = "rateConversion"; - public static final String ARG_CONVERSION_DURATION = "durationConversion"; - - // ------------------------------------------------------------------------ - - protected final MetricRegistry registry; - - protected ScheduledReporter reporter; - - private final Map<Gauge<?>, String> gauges = new HashMap<>(); - private final Map<Counter, String> counters = new HashMap<>(); - - // ------------------------------------------------------------------------ - - protected ScheduledDropwizardReporter() { - this.registry = new MetricRegistry(); - } - - // ------------------------------------------------------------------------ - // life cycle - // ------------------------------------------------------------------------ - - @Override - public void open(Configuration config) { - this.reporter = getReporter(config); - } - - @Override - public void close() { - this.reporter.stop(); - } - - // ------------------------------------------------------------------------ - // adding / removing metrics - // ------------------------------------------------------------------------ - - @Override - public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { - final String fullName = group.getScopeString() + '.' + metricName; - - synchronized (this) { - if (metric instanceof Counter) { - counters.put((Counter) metric, fullName); - registry.register(fullName, new CounterWrapper((Counter) metric)); - } - else if (metric instanceof Gauge) { - gauges.put((Gauge<?>) metric, fullName); - registry.register(fullName, GaugeWrapper.fromGauge((Gauge<?>) metric)); - } - } - } - - @Override - public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) { - synchronized (this) { - String fullName; - - if (metric instanceof Counter) { - fullName = counters.remove(metric); - } else if (metric instanceof Gauge) { - fullName = gauges.remove(metric); - } else { - fullName = null; - } - - if (fullName != null) { - registry.remove(fullName); - } - } - } - - // ------------------------------------------------------------------------ - // scheduled reporting - // ------------------------------------------------------------------------ - - @Override - public void report() { - // we do not need to lock here, because the dropwizard registry is - // internally a concurrent map - @SuppressWarnings("rawtypes") - final SortedMap<String, com.codahale.metrics.Gauge> gauges = registry.getGauges(); - final SortedMap<String, com.codahale.metrics.Counter> counters = registry.getCounters(); - final SortedMap<String, com.codahale.metrics.Histogram> histograms = registry.getHistograms(); - final SortedMap<String, com.codahale.metrics.Meter> meters = registry.getMeters(); - final SortedMap<String, com.codahale.metrics.Timer> timers = registry.getTimers(); - - this.reporter.report(gauges, counters, histograms, meters, timers); - } - - public abstract ScheduledReporter getReporter(Configuration config); -} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java deleted file mode 100644 index f6630b9..0000000 --- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.dropwizard.metrics; - -import org.apache.flink.metrics.Counter; - -public class CounterWrapper extends com.codahale.metrics.Counter { - private final Counter counter; - - public CounterWrapper(Counter counter) { - this.counter = counter; - } - - @Override - public long getCount() { - return this.counter.getCount(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java deleted file mode 100644 index 655cd60..0000000 --- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.dropwizard.metrics; - -import org.apache.flink.metrics.Gauge; - -public class GaugeWrapper<T> implements com.codahale.metrics.Gauge<T> { - - private final Gauge<T> gauge; - - public GaugeWrapper(Gauge<T> gauge) { - this.gauge = gauge; - } - - @Override - public T getValue() { - return this.gauge.getValue(); - } - - public static <T> GaugeWrapper<T> fromGauge(Gauge<?> gauge) { - @SuppressWarnings("unchecked") - Gauge<T> typedGauge = (Gauge<T>) gauge; - return new GaugeWrapper<>(typedGauge); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-ganglia/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-ganglia/pom.xml b/flink-metric-reporters/flink-metrics-ganglia/pom.xml deleted file mode 100644 index a457ca1..0000000 --- a/flink-metric-reporters/flink-metrics-ganglia/pom.xml +++ /dev/null @@ -1,90 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metric-reporters</artifactId> - <version>1.1-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-metrics-ganglia</artifactId> - <name>flink-metrics-ganglia</name> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-dropwizard</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>info.ganglia.gmetric4j</groupId> - <artifactId>gmetric4j</artifactId> - <version>1.0.7</version> - </dependency> - - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>${metrics.version}</version> - </dependency> - - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-ganglia</artifactId> - <version>${metrics.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.4</version> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - </configuration> - <executions> - <execution> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java deleted file mode 100644 index adf9394..0000000 --- a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.ganglia; - -import com.codahale.metrics.ScheduledReporter; - -import info.ganglia.gmetric4j.gmetric.GMetric; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.dropwizard.ScheduledDropwizardReporter; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -@PublicEvolving -public class GangliaReporter extends ScheduledDropwizardReporter { - - public static final String ARG_DMAX = "dmax"; - public static final String ARG_TMAX = "tmax"; - public static final String ARG_TTL = "ttl"; - public static final String ARG_MODE_ADDRESSING = "addressingMode"; - - @Override - public ScheduledReporter getReporter(Configuration config) { - - try { - String host = config.getString(ARG_HOST, null); - int port = config.getInteger(ARG_PORT, -1); - if (host == null || host.length() == 0 || port < 1) { - throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); - } - String addressingMode = config.getString(ARG_MODE_ADDRESSING, "MULTICAST"); - int ttl = config.getInteger(ARG_TTL, -1); - GMetric gMetric = new GMetric(host, port, GMetric.UDPAddressingMode.valueOf(addressingMode), ttl); - - String prefix = config.getString(ARG_PREFIX, null); - String conversionRate = config.getString(ARG_CONVERSION_RATE, null); - String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null); - int dMax = config.getInteger(ARG_DMAX, 0); - int tMax = config.getInteger(ARG_TMAX, 60); - - com.codahale.metrics.ganglia.GangliaReporter.Builder builder = - com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry); - - if (prefix != null) { - builder.prefixedWith(prefix); - } - if (conversionRate != null) { - builder.convertRatesTo(TimeUnit.valueOf(conversionRate)); - } - if (conversionDuration != null) { - builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration)); - } - builder.withDMax(dMax); - builder.withTMax(tMax); - - return builder.build(gMetric); - } catch (IOException e) { - throw new RuntimeException("Error while instantiating GangliaReporter.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-graphite/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-graphite/pom.xml b/flink-metric-reporters/flink-metrics-graphite/pom.xml deleted file mode 100644 index 714b77f..0000000 --- a/flink-metric-reporters/flink-metrics-graphite/pom.xml +++ /dev/null @@ -1,84 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metric-reporters</artifactId> - <version>1.1-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-metrics-graphite</artifactId> - <name>flink-metrics-graphite</name> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-dropwizard</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>${metrics.version}</version> - </dependency> - - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-graphite</artifactId> - <version>${metrics.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.4</version> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - </configuration> - <executions> - <execution> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java deleted file mode 100644 index 16be830..0000000 --- a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.graphite; - -import com.codahale.metrics.ScheduledReporter; -import com.codahale.metrics.graphite.Graphite; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.dropwizard.ScheduledDropwizardReporter; - -import java.util.concurrent.TimeUnit; - -@PublicEvolving -public class GraphiteReporter extends ScheduledDropwizardReporter { - - @Override - public ScheduledReporter getReporter(Configuration config) { - String host = config.getString(ARG_HOST, null); - int port = config.getInteger(ARG_PORT, -1); - - if (host == null || host.length() == 0 || port < 1) { - throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); - } - - String prefix = config.getString(ARG_PREFIX, null); - String conversionRate = config.getString(ARG_CONVERSION_RATE, null); - String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null); - - com.codahale.metrics.graphite.GraphiteReporter.Builder builder = - com.codahale.metrics.graphite.GraphiteReporter.forRegistry(registry); - - if (prefix != null) { - builder.prefixedWith(prefix); - } - - if (conversionRate != null) { - builder.convertRatesTo(TimeUnit.valueOf(conversionRate)); - } - - if (conversionDuration != null) { - builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration)); - } - - return builder.build(new Graphite(host, port)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-statsd/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-statsd/pom.xml b/flink-metric-reporters/flink-metrics-statsd/pom.xml deleted file mode 100644 index 3052a10..0000000 --- a/flink-metric-reporters/flink-metrics-statsd/pom.xml +++ /dev/null @@ -1,43 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metric-reporters</artifactId> - <version>1.1-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-metrics-statsd</artifactId> - <name>flink-metrics-statsd</name> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java deleted file mode 100644 index 087a265..0000000 --- a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.statsd; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.reporter.AbstractReporter; -import org.apache.flink.metrics.reporter.Scheduled; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.SocketException; -import java.util.ConcurrentModificationException; -import java.util.Map; -import java.util.NoSuchElementException; - -/** - * Largely based on the StatsDReporter class by ReadyTalk - * https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java - * - * Ported since it was not present in maven central. - */ -@PublicEvolving -public class StatsDReporter extends AbstractReporter implements Scheduled { - - private static final Logger LOG = LoggerFactory.getLogger(StatsDReporter.class); - - public static final String ARG_HOST = "host"; - public static final String ARG_PORT = "port"; -// public static final String ARG_CONVERSION_RATE = "rateConversion"; -// public static final String ARG_CONVERSION_DURATION = "durationConversion"; - - private boolean closed = false; - - private DatagramSocket socket; - private InetSocketAddress address; - - @Override - public void open(Configuration config) { - String host = config.getString(ARG_HOST, null); - int port = config.getInteger(ARG_PORT, -1); - - if (host == null || host.length() == 0 || port < 1) { - throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); - } - - this.address = new InetSocketAddress(host, port); - -// String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS"); -// String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS"); -// this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1); -// this.durationFactor = 1.0 / TimeUnit.valueOf(conversionDuration).toNanos(1); - - try { - this.socket = new DatagramSocket(0); - } catch (SocketException e) { - throw new RuntimeException("Could not create datagram socket. ", e); - } - } - - @Override - public void close() { - closed = true; - if (socket != null && !socket.isClosed()) { - socket.close(); - } - } - - // ------------------------------------------------------------------------ - - @Override - public void report() { - // instead of locking here, we tolerate exceptions - // we do this to prevent holding the lock for very long and blocking - // operator creation and shutdown - try { - for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) { - if (closed) { - return; - } - reportGauge(entry.getValue(), entry.getKey()); - } - - for (Map.Entry<Counter, String> entry : counters.entrySet()) { - if (closed) { - return; - } - reportCounter(entry.getValue(), entry.getKey()); - } - } - catch (ConcurrentModificationException | NoSuchElementException e) { - // ignore - may happen when metrics are concurrently added or removed - // report next time - } - } - - // ------------------------------------------------------------------------ - - private void reportCounter(final String name, final Counter counter) { - send(name, String.valueOf(counter.getCount())); - } - - private void reportGauge(final String name, final Gauge<?> gauge) { - Object value = gauge.getValue(); - if (value != null) { - send(name, value.toString()); - } - } - - private void send(final String name, final String value) { - try { - String formatted = String.format("%s:%s|g", name, value); - byte[] data = formatted.getBytes(); - socket.send(new DatagramPacket(data, data.length, this.address)); - } - catch (IOException e) { - LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/pom.xml b/flink-metric-reporters/pom.xml deleted file mode 100644 index 01a809c..0000000 --- a/flink-metric-reporters/pom.xml +++ /dev/null @@ -1,42 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-parent</artifactId> - <version>1.1-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-metric-reporters</artifactId> - <name>flink-metric-reporters</name> - <packaging>pom</packaging> - - <modules> - <module>flink-metrics-dropwizard</module> - <module>flink-metrics-ganglia</module> - <module>flink-metrics-graphite</module> - <module>flink-metrics-statsd</module> - </modules> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/pom.xml b/flink-metrics/flink-metrics-dropwizard/pom.xml new file mode 100644 index 0000000..90dbc00 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/pom.xml @@ -0,0 +1,80 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-metrics-dropwizard</artifactId> + <name>flink-metrics-dropwizard</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${metrics.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.4</version> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java new file mode 100644 index 0000000..062bbd8 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reporter; +import com.codahale.metrics.ScheduledReporter; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper; +import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper; +import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper; +import org.apache.flink.dropwizard.metrics.FlinkHistogramWrapper; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.groups.AbstractMetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; + +/** + * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} that wraps a + * Dropwizard {@link com.codahale.metrics.Reporter}. + */ +@PublicEvolving +public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled, Reporter { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + public static final String ARG_PREFIX = "prefix"; + public static final String ARG_CONVERSION_RATE = "rateConversion"; + public static final String ARG_CONVERSION_DURATION = "durationConversion"; + + // ------------------------------------------------------------------------ + + protected final MetricRegistry registry; + + protected ScheduledReporter reporter; + + private final Map<Gauge<?>, String> gauges = new HashMap<>(); + private final Map<Counter, String> counters = new HashMap<>(); + private final Map<Histogram, String> histograms = new HashMap<>(); + + // ------------------------------------------------------------------------ + + protected ScheduledDropwizardReporter() { + this.registry = new MetricRegistry(); + } + + // ------------------------------------------------------------------------ + // life cycle + // ------------------------------------------------------------------------ + + @Override + public void open(Configuration config) { + this.reporter = getReporter(config); + } + + @Override + public void close() { + this.reporter.stop(); + } + + // ------------------------------------------------------------------------ + // adding / removing metrics + // ------------------------------------------------------------------------ + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { + final String fullName = group.getScopeString() + '.' + metricName; + + synchronized (this) { + if (metric instanceof Counter) { + counters.put((Counter) metric, fullName); + registry.register(fullName, new FlinkCounterWrapper((Counter) metric)); + } + else if (metric instanceof Gauge) { + gauges.put((Gauge<?>) metric, fullName); + registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric)); + } else if (metric instanceof Histogram) { + Histogram histogram = (Histogram) metric; + histograms.put(histogram, fullName); + + if (histogram instanceof DropwizardHistogramWrapper) { + registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizarHistogram()); + } else { + registry.register(fullName, new FlinkHistogramWrapper(histogram)); + } + } else { + log.warn("Cannot add metric of type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) { + synchronized (this) { + String fullName; + + if (metric instanceof Counter) { + fullName = counters.remove(metric); + } else if (metric instanceof Gauge) { + fullName = gauges.remove(metric); + } else if (metric instanceof Histogram) { + fullName = histograms.remove(metric); + } else { + fullName = null; + } + + if (fullName != null) { + registry.remove(fullName); + } + } + } + + // ------------------------------------------------------------------------ + // scheduled reporting + // ------------------------------------------------------------------------ + + @Override + public void report() { + // we do not need to lock here, because the dropwizard registry is + // internally a concurrent map + @SuppressWarnings("rawtypes") + final SortedMap<String, com.codahale.metrics.Gauge> gauges = registry.getGauges(); + final SortedMap<String, com.codahale.metrics.Counter> counters = registry.getCounters(); + final SortedMap<String, com.codahale.metrics.Histogram> histograms = registry.getHistograms(); + final SortedMap<String, com.codahale.metrics.Meter> meters = registry.getMeters(); + final SortedMap<String, com.codahale.metrics.Timer> timers = registry.getTimers(); + + this.reporter.report(gauges, counters, histograms, meters, timers); + } + + public abstract ScheduledReporter getReporter(Configuration config); +} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java new file mode 100644 index 0000000..6f4eab2 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import com.codahale.metrics.Snapshot; +import org.apache.flink.metrics.HistogramStatistics; + +/** + * Dropwizard histogram statistics implementation returned by {@link DropwizardHistogramWrapper}. + * The statistics class wraps a {@link Snapshot} instance and forwards the method calls accordingly. + */ +class DropwizardHistogramStatistics extends HistogramStatistics { + + private final com.codahale.metrics.Snapshot snapshot; + + DropwizardHistogramStatistics(com.codahale.metrics.Snapshot snapshot) { + this.snapshot = snapshot; + } + + @Override + public double getQuantile(double quantile) { + return snapshot.getValue(quantile); + } + + @Override + public long[] getValues() { + return snapshot.getValues(); + } + + @Override + public int size() { + return snapshot.size(); + } + + @Override + public double getMean() { + return snapshot.getMean(); + } + + @Override + public double getStdDev() { + return snapshot.getStdDev(); + } + + @Override + public long getMax() { + return snapshot.getMax(); + } + + @Override + public long getMin() { + return snapshot.getMin(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java new file mode 100644 index 0000000..79a6a56 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; + +/** + * Wrapper to use a Dropwizard {@link com.codahale.metrics.Histogram} as a Flink {@link Histogram}. + */ +public class DropwizardHistogramWrapper implements Histogram { + + private final com.codahale.metrics.Histogram dropwizarHistogram; + + public DropwizardHistogramWrapper(com.codahale.metrics.Histogram dropwizardHistogram) { + this.dropwizarHistogram = dropwizardHistogram; + } + + public com.codahale.metrics.Histogram getDropwizarHistogram() { + return dropwizarHistogram; + } + + @Override + public void update(long value) { + dropwizarHistogram.update(value); + } + + @Override + public long getCount() { + return dropwizarHistogram.getCount(); + } + + @Override + public HistogramStatistics getStatistics() { + return new DropwizardHistogramStatistics(dropwizarHistogram.getSnapshot()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java new file mode 100644 index 0000000..a44c3f5 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.dropwizard.metrics; + +import org.apache.flink.metrics.Counter; + +public class FlinkCounterWrapper extends com.codahale.metrics.Counter { + private final Counter counter; + + public FlinkCounterWrapper(Counter counter) { + this.counter = counter; + } + + @Override + public long getCount() { + return this.counter.getCount(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java new file mode 100644 index 0000000..058ecad --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import org.apache.flink.metrics.Gauge; + +public class FlinkGaugeWrapper<T> implements com.codahale.metrics.Gauge<T> { + + private final Gauge<T> gauge; + + public FlinkGaugeWrapper(Gauge<T> gauge) { + this.gauge = gauge; + } + + @Override + public T getValue() { + return this.gauge.getValue(); + } + + public static <T> FlinkGaugeWrapper<T> fromGauge(Gauge<?> gauge) { + @SuppressWarnings("unchecked") + Gauge<T> typedGauge = (Gauge<T>) gauge; + return new FlinkGaugeWrapper<>(typedGauge); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java new file mode 100644 index 0000000..8bd8078 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import com.codahale.metrics.Snapshot; +import org.apache.flink.metrics.Histogram; + +/** + * Wrapper to use a Flink {@link Histogram} as a Dropwizard {@link com.codahale.metrics.Histogram}. + * This is necessary to report Flink's histograms via the Dropwizard + * {@link com.codahale.metrics.Reporter}. + */ +public class FlinkHistogramWrapper extends com.codahale.metrics.Histogram { + + private final Histogram histogram; + + public FlinkHistogramWrapper(Histogram histogram) { + super(null); + this.histogram = histogram; + } + + @Override + public void update(long value) { + histogram.update(value); + } + + @Override + public long getCount() { + return histogram.getCount(); + } + + @Override + public Snapshot getSnapshot() { + return new HistogramStatisticsWrapper(histogram.getStatistics()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java new file mode 100644 index 0000000..6d3a69b --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import com.codahale.metrics.Snapshot; +import org.apache.flink.metrics.HistogramStatistics; + +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.Charset; + +/** + * Wrapper to use Flink's {@link HistogramStatistics} as a Dropwizard {@link Snapshot}. This is + * necessary to report Flink's histograms via the Dropwizard {@link com.codahale.metrics.Reporter}. + */ +class HistogramStatisticsWrapper extends Snapshot { + + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + private final HistogramStatistics histogramStatistics; + + HistogramStatisticsWrapper(HistogramStatistics histogramStatistics) { + this.histogramStatistics = histogramStatistics; + } + @Override + public double getValue(double quantile) { + return histogramStatistics.getQuantile(quantile); + } + + @Override + public long[] getValues() { + return histogramStatistics.getValues(); + } + + @Override + public int size() { + return histogramStatistics.size(); + } + + @Override + public long getMax() { + return histogramStatistics.getMax(); + } + + @Override + public double getMean() { + return histogramStatistics.getMean(); + } + + @Override + public long getMin() { + return histogramStatistics.getMin(); + } + + @Override + public double getStdDev() { + return histogramStatistics.getStdDev(); + } + + @Override + public void dump(OutputStream output) { + try(PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(output, UTF_8))){ + + for (Long value : histogramStatistics.getValues()) { + printWriter.printf("%d%n", value); + } + } + } +}