[FLINK-3951] Add Histogram metric type

This closes #2112


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee3c7a88
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee3c7a88
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee3c7a88

Branch: refs/heads/master
Commit: ee3c7a88bb74232e4884899699aaa08ae2b6e038
Parents: d43bf8d
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Jun 14 19:04:43 2016 +0200
Committer: zentol <ches...@apache.org>
Committed: Mon Jun 27 15:32:03 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/metrics/Histogram.java     |  52 +++
 .../flink/metrics/HistogramStatistics.java      |  81 +++++
 .../org/apache/flink/metrics/MetricGroup.java   |  20 ++
 .../metrics/groups/AbstractMetricGroup.java     |  12 +
 .../groups/UnregisteredMetricsGroup.java        |  12 +-
 .../metrics/reporter/AbstractReporter.java      |  15 +
 .../flink/metrics/reporter/JMXReporter.java     |  98 +++++-
 .../flink/metrics/MetricRegistryTest.java       |   3 +-
 .../groups/MetricGroupRegistrationTest.java     |  21 ++
 .../flink/metrics/reporter/JMXReporterTest.java | 108 ++++++-
 .../flink-metrics-dropwizard/pom.xml            |  72 -----
 .../dropwizard/ScheduledDropwizardReporter.java | 140 --------
 .../dropwizard/metrics/CounterWrapper.java      |  33 --
 .../flink/dropwizard/metrics/GaugeWrapper.java  |  41 ---
 .../flink-metrics-ganglia/pom.xml               |  90 ------
 .../flink/metrics/ganglia/GangliaReporter.java  |  79 -----
 .../flink-metrics-graphite/pom.xml              |  84 -----
 .../metrics/graphite/GraphiteReporter.java      |  63 ----
 .../flink-metrics-statsd/pom.xml                |  43 ---
 .../flink/metrics/statsd/StatsDReporter.java    | 143 ---------
 flink-metric-reporters/pom.xml                  |  42 ---
 flink-metrics/flink-metrics-dropwizard/pom.xml  |  80 +++++
 .../dropwizard/ScheduledDropwizardReporter.java | 162 ++++++++++
 .../metrics/DropwizardHistogramStatistics.java  |  70 ++++
 .../metrics/DropwizardHistogramWrapper.java     |  53 +++
 .../dropwizard/metrics/FlinkCounterWrapper.java |  33 ++
 .../dropwizard/metrics/FlinkGaugeWrapper.java   |  41 +++
 .../metrics/FlinkHistogramWrapper.java          |  52 +++
 .../metrics/HistogramStatisticsWrapper.java     |  86 +++++
 .../DropwizardFlinkHistogramWrapperTest.java    | 319 +++++++++++++++++++
 .../src/test/resources/log4j-test.properties    |  27 ++
 .../src/test/resources/logback-test.xml         |  34 ++
 flink-metrics/flink-metrics-ganglia/pom.xml     |  90 ++++++
 .../flink/metrics/ganglia/GangliaReporter.java  |  79 +++++
 flink-metrics/flink-metrics-graphite/pom.xml    |  84 +++++
 .../metrics/graphite/GraphiteReporter.java      |  63 ++++
 flink-metrics/flink-metrics-statsd/pom.xml      |  51 +++
 .../flink/metrics/statsd/StatsDReporter.java    | 184 +++++++++++
 .../metrics/statsd/StatsDReporterTest.java      | 236 ++++++++++++++
 .../src/test/resources/log4j-test.properties    |  27 ++
 .../src/test/resources/logback-test.xml         |  34 ++
 flink-metrics/pom.xml                           |  42 +++
 pom.xml                                         |   2 +-
 43 files changed, 2264 insertions(+), 837 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java 
b/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
new file mode 100644
index 0000000..3fd1253
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Histogram interface to be used with Flink's metrics system.
+ *
+ * The histogram allows to record values, get the current count of recorded 
values and create
+ * histogram statistics for the currently seen elements.
+ */
+@PublicEvolving
+public interface Histogram extends Metric {
+
+       /**
+        * Update the histogram with the given value.
+        *
+        * @param value Value to update the histogram with
+        */
+       void update(long value);
+
+       /**
+        * Get the count of seen elements.
+        *
+        * @return Count of seen elements
+        */
+       long getCount();
+
+       /**
+        * Create statistics for the currently recorded elements.
+        *
+        * @return Statistics about the currently recorded elements
+        */
+       HistogramStatistics getStatistics();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java 
b/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
new file mode 100644
index 0000000..476580c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Histogram statistics represent the current snapshot of elements recorded in 
the histogram.
+ *
+ * The histogram statistics allow to calculate values for quantiles, the mean, 
the standard
+ * deviation, the minimum and the maximum.
+ */
+@PublicEvolving
+public abstract class HistogramStatistics {
+
+       /**
+        * Returns the value for the given quantile based on the represented 
histogram statistics.
+        *
+        * @param quantile Quantile to calculate the value for
+        * @return Value for the given quantile
+        */
+       public abstract double getQuantile(double quantile);
+
+       /**
+        * Returns the elements of the statistics' sample
+        *
+        * @return Elements of the statistics' sample
+        */
+       public abstract long[] getValues();
+
+       /**
+        * Returns the size of the statistics' sample
+        *
+        * @return Size of the statistics' sample
+        */
+       public abstract int size();
+
+       /**
+        * Returns the mean of the histogram values.
+        *
+        * @return Mean of the histogram values
+        */
+       public abstract double getMean();
+
+       /**
+        * Returns the standard deviation of the distribution reflected by the 
histogram statistics.
+        *
+        * @return Standard deviation of histogram distribution
+        */
+       public abstract double getStdDev();
+
+       /**
+        * Returns the maximum value of the histogram.
+        *
+        * @return Maximum value of the histogram
+        */
+       public abstract long getMax();
+
+       /**
+        * Returns the minimum value of the histogram.
+        *
+        * @return Minimum value of the histogram
+        */
+       public abstract long getMin();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
index b131949..f46d3fc 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -115,6 +115,26 @@ public interface MetricGroup {
         */
        <T, G extends Gauge<T>> G gauge(String name, G gauge);
 
+       /**
+        * Registers a new {@link Histogram} with Flink.
+        *
+        * @param name name of the histogram
+        * @param histogram histogram to register
+        * @param <H> histogram type   
+        * @return the registered histogram
+        */
+       <H extends Histogram> H histogram(String name, H histogram);
+
+       /**
+        * Registers a new {@link Histogram} with Flink.
+        *
+        * @param name name of the histogram
+        * @param histogram histogram to register
+        * @param <H> histogram type   
+        * @return the registered histogram
+        */
+       <H extends Histogram> H histogram(int name, H histogram);
+
        // 
------------------------------------------------------------------------
        // Groups
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
index 93eb734..112957e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
@@ -21,6 +21,7 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
@@ -172,6 +173,17 @@ public abstract class AbstractMetricGroup implements 
MetricGroup {
                return gauge;
        }
 
+       @Override
+       public <H extends Histogram> H histogram(int name, H histogram) {
+               return histogram(String.valueOf(name), histogram);
+       }
+
+       @Override
+       public <H extends Histogram> H histogram(String name, H histogram) {
+               addMetric(name, histogram);
+               return histogram;
+       }
+
        /**
         * Adds the given metric to the group and registers it at the registry, 
if the group
         * is not yet closed, and if no metric with the same name has been 
registered before.

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
index 29d71d9..8e183df 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
@@ -21,6 +21,7 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 
@@ -71,7 +72,16 @@ public class UnregisteredMetricsGroup implements MetricGroup 
{
                return gauge;
        }
 
-       
+       @Override
+       public <H extends Histogram> H histogram(int name, H histogram) {
+               return histogram;
+       }
+
+       @Override
+       public <H extends Histogram> H histogram(String name, H histogram) {
+               return histogram;
+       }
+
        @Override
        public MetricGroup addGroup(int name) {
                return addGroup(String.valueOf(name));

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
index f2e78bf..8dacb7c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
@@ -21,8 +21,11 @@ package org.apache.flink.metrics.reporter;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -32,9 +35,11 @@ import java.util.Map;
  */
 @PublicEvolving
 public abstract class AbstractReporter implements MetricReporter {
+       protected final Logger log = LoggerFactory.getLogger(getClass());
 
        protected final Map<Gauge<?>, String> gauges = new HashMap<>();
        protected final Map<Counter, String> counters = new HashMap<>();
+       protected final Map<Histogram, String> histograms = new HashMap<>();
 
        @Override
        public void notifyOfAddedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
@@ -45,6 +50,11 @@ public abstract class AbstractReporter implements 
MetricReporter {
                                counters.put((Counter) metric, name);
                        } else if (metric instanceof Gauge) {
                                gauges.put((Gauge<?>) metric, name);
+                       } else if (metric instanceof Histogram) {
+                               histograms.put((Histogram) metric, name);
+                       } else {
+                               log.warn("Cannot add unknown metric type {}. 
This indicates that the reporter " +
+                                       "does not support this metric type.", 
metric.getClass().getName());
                        }
                }
        }
@@ -56,6 +66,11 @@ public abstract class AbstractReporter implements 
MetricReporter {
                                counters.remove(metric);
                        } else if (metric instanceof Gauge) {
                                gauges.remove(metric);
+                       } else if (metric instanceof Histogram) {
+                               histograms.remove(metric);
+                       } else {
+                               log.warn("Cannot remove unknown metric type {}. 
This indicates that the reporter " +
+                                       "does not support this metric type.", 
metric.getClass().getName());
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
index 326d6d7..eaf0ea0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.util.NetUtils;
@@ -146,8 +147,11 @@ public class JMXReporter implements MetricReporter {
                        jmxMetric = new JmxGauge((Gauge<?>) metric);
                } else if (metric instanceof Counter) {
                        jmxMetric = new JmxCounter((Counter) metric);
+               } else if (metric instanceof Histogram) {
+                       jmxMetric = new JmxHistogram((Histogram) metric);
                } else {
-                       LOG.error("Unknown metric type: " + 
metric.getClass().getName());
+                       LOG.error("Cannot add unknown metric type: {}. This 
indicates that the metric type " +
+                               "is not supported by this reporter.", 
metric.getClass().getName());
                        return;
                }
 
@@ -285,7 +289,7 @@ public class JMXReporter implements MetricReporter {
        private static class JmxCounter extends AbstractBean implements 
JmxCounterMBean {
                private Counter counter;
 
-               public JmxCounter(Counter counter) {
+               JmxCounter(Counter counter) {
                        this.counter = counter;
                }
 
@@ -303,7 +307,7 @@ public class JMXReporter implements MetricReporter {
 
                private final Gauge<?> gauge;
 
-               public JmxGauge(Gauge<?> gauge) {
+               JmxGauge(Gauge<?> gauge) {
                        this.gauge = gauge;
                }
 
@@ -313,6 +317,94 @@ public class JMXReporter implements MetricReporter {
                }
        }
 
+       public interface JmxHistogramMBean extends MetricMBean {
+               long getCount();
+
+               double getMean();
+
+               double getStdDev();
+
+               long getMax();
+
+               long getMin();
+
+               double getMedian();
+
+               double get75thPercentile();
+
+               double get95thPercentile();
+
+               double get98thPercentile();
+
+               double get99thPercentile();
+
+               double get999thPercentile();
+       }
+
+       private static class JmxHistogram extends AbstractBean implements 
JmxHistogramMBean {
+
+               private final Histogram histogram;
+
+               JmxHistogram(Histogram histogram) {
+                       this.histogram = histogram;
+               }
+
+               @Override
+               public long getCount() {
+                       return histogram.getCount();
+               }
+
+               @Override
+               public double getMean() {
+                       return histogram.getStatistics().getMean();
+               }
+
+               @Override
+               public double getStdDev() {
+                       return histogram.getStatistics().getStdDev();
+               }
+
+               @Override
+               public long getMax() {
+                       return histogram.getStatistics().getMax();
+               }
+
+               @Override
+               public long getMin() {
+                       return histogram.getStatistics().getMin();
+               }
+
+               @Override
+               public double getMedian() {
+                       return histogram.getStatistics().getQuantile(0.5);
+               }
+
+               @Override
+               public double get75thPercentile() {
+                       return histogram.getStatistics().getQuantile(0.75);
+               }
+
+               @Override
+               public double get95thPercentile() {
+                       return histogram.getStatistics().getQuantile(0.95);
+               }
+
+               @Override
+               public double get98thPercentile() {
+                       return histogram.getStatistics().getQuantile(0.98);
+               }
+
+               @Override
+               public double get99thPercentile() {
+                       return histogram.getStatistics().getQuantile(0.99);
+               }
+
+               @Override
+               public double get999thPercentile() {
+                       return histogram.getStatistics().getQuantile(0.999);
+               }
+       }
+
        /**
         * JMX Server implementation that JMX clients can connect to.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java 
b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
index f8e0bf5..8b71816 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
@@ -25,12 +25,13 @@ import org.apache.flink.metrics.groups.scope.ScopeFormats;
 import org.apache.flink.metrics.reporter.Scheduled;
 import org.apache.flink.metrics.util.TestReporter;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
-public class MetricRegistryTest {
+public class MetricRegistryTest extends TestLogger {
        
        /**
         * Verifies that the reporter class argument is correctly used to 
instantiate and open the reporter.

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
index 7b35d91..c7a112a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
@@ -57,6 +59,25 @@ public class MetricGroupRegistrationTest {
                Assert.assertEquals(gauge, TestReporter1.lastPassedMetric);
                assertEquals("gauge", TestReporter1.lastPassedName);
 
+               Histogram histogram = root.histogram("histogram", new 
Histogram() {
+                       @Override
+                       public void update(long value) {
+
+                       }
+
+                       @Override
+                       public long getCount() {
+                               return 0;
+                       }
+
+                       @Override
+                       public HistogramStatistics getStatistics() {
+                               return null;
+                       }
+               });
+
+               Assert.assertEquals(histogram, TestReporter1.lastPassedMetric);
+               assertEquals("histogram", TestReporter1.lastPassedName);
                registry.shutdown();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
index d25f744..9e638a7 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
@@ -20,11 +20,16 @@ package org.apache.flink.metrics.reporter;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.MetricRegistry;
 import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.metrics.util.TestReporter;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
 import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
@@ -37,7 +42,7 @@ import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_JMX_PORT;
 import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_CLASS;
 import static org.junit.Assert.assertEquals;
 
-public class JMXReporterTest {
+public class JMXReporterTest extends TestLogger {
 
        @Test
        public void testReplaceInvalidChars() {
@@ -188,4 +193,105 @@ public class JMXReporterTest {
                rep2.close();
                reg.shutdown();
        }
+
+       /**
+        * Tests that histograms are properly reported via the JMXReporter.
+        */
+       @Test
+       public void testHistogramReporting() throws Exception {
+               MetricRegistry registry = null;
+               String histogramName = "histogram";
+
+               try {
+                       Configuration config = new Configuration();
+
+                       registry = new MetricRegistry(config);
+
+                       TaskManagerMetricGroup metricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "tmId");
+
+                       TestingHistogram histogram = new TestingHistogram();
+
+                       metricGroup.histogram(histogramName, histogram);
+
+                       MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
+
+                       ObjectName objectName = new 
ObjectName(JMXReporter.generateJmxName(histogramName, 
metricGroup.getScopeComponents()));
+
+                       MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
+
+                       MBeanAttributeInfo[] attributeInfos = 
info.getAttributes();
+
+                       assertEquals(11, attributeInfos.length);
+
+                       assertEquals(histogram.getCount(), 
mBeanServer.getAttribute(objectName, "Count"));
+                       assertEquals(histogram.getStatistics().getMean(), 
mBeanServer.getAttribute(objectName, "Mean"));
+                       assertEquals(histogram.getStatistics().getStdDev(), 
mBeanServer.getAttribute(objectName, "StdDev"));
+                       assertEquals(histogram.getStatistics().getMax(), 
mBeanServer.getAttribute(objectName, "Max"));
+                       assertEquals(histogram.getStatistics().getMin(), 
mBeanServer.getAttribute(objectName, "Min"));
+                       
assertEquals(histogram.getStatistics().getQuantile(0.5), 
mBeanServer.getAttribute(objectName, "Median"));
+                       
assertEquals(histogram.getStatistics().getQuantile(0.75), 
mBeanServer.getAttribute(objectName, "75thPercentile"));
+                       
assertEquals(histogram.getStatistics().getQuantile(0.95), 
mBeanServer.getAttribute(objectName, "95thPercentile"));
+                       
assertEquals(histogram.getStatistics().getQuantile(0.98), 
mBeanServer.getAttribute(objectName, "98thPercentile"));
+                       
assertEquals(histogram.getStatistics().getQuantile(0.99), 
mBeanServer.getAttribute(objectName, "99thPercentile"));
+                       
assertEquals(histogram.getStatistics().getQuantile(0.999), 
mBeanServer.getAttribute(objectName, "999thPercentile"));
+
+               } finally {
+                       if (registry != null) {
+                               registry.shutdown();
+                       }
+               }
+       }
+
+       static class TestingHistogram implements Histogram {
+
+               @Override
+               public void update(long value) {
+
+               }
+
+               @Override
+               public long getCount() {
+                       return 1;
+               }
+
+               @Override
+               public HistogramStatistics getStatistics() {
+                       return new HistogramStatistics() {
+                               @Override
+                               public double getQuantile(double quantile) {
+                                       return quantile;
+                               }
+
+                               @Override
+                               public long[] getValues() {
+                                       return new long[0];
+                               }
+
+                               @Override
+                               public int size() {
+                                       return 3;
+                               }
+
+                               @Override
+                               public double getMean() {
+                                       return 4;
+                               }
+
+                               @Override
+                               public double getStdDev() {
+                                       return 5;
+                               }
+
+                               @Override
+                               public long getMax() {
+                                       return 6;
+                               }
+
+                               @Override
+                               public long getMin() {
+                                       return 7;
+                               }
+                       };
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-dropwizard/pom.xml 
b/flink-metric-reporters/flink-metrics-dropwizard/pom.xml
deleted file mode 100644
index a386880..0000000
--- a/flink-metric-reporters/flink-metrics-dropwizard/pom.xml
+++ /dev/null
@@ -1,72 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-metric-reporters</artifactId>
-               <version>1.1-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-metrics-dropwizard</artifactId>
-       <name>flink-metrics-dropwizard</name>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>io.dropwizard.metrics</groupId>
-                       <artifactId>metrics-core</artifactId>
-                       <version>${metrics.version}</version>
-               </dependency>
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <artifactId>maven-assembly-plugin</artifactId>
-                               <version>2.4</version>
-                               <configuration>
-                                       <descriptorRefs>
-                                               
<descriptorRef>jar-with-dependencies</descriptorRef>
-                                       </descriptorRefs>
-                               </configuration>
-                               <executions>
-                                       <execution>
-                                               <id>make-assembly</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>single</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
 
b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
deleted file mode 100644
index d67f3e3..0000000
--- 
a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.dropwizard;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Reporter;
-import com.codahale.metrics.ScheduledReporter;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.dropwizard.metrics.CounterWrapper;
-import org.apache.flink.dropwizard.metrics.GaugeWrapper;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedMap;
-
-/**
- * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} 
that wraps a
- * Dropwizard {@link com.codahale.metrics.Reporter}.
- */
-@PublicEvolving
-public abstract class ScheduledDropwizardReporter implements MetricReporter, 
Scheduled, Reporter {
-
-       public static final String ARG_HOST = "host";
-       public static final String ARG_PORT = "port";
-       public static final String ARG_PREFIX = "prefix";
-       public static final String ARG_CONVERSION_RATE = "rateConversion";
-       public static final String ARG_CONVERSION_DURATION = 
"durationConversion";
-
-       // 
------------------------------------------------------------------------
-
-       protected final MetricRegistry registry;
-
-       protected ScheduledReporter reporter;
-
-       private final Map<Gauge<?>, String> gauges = new HashMap<>();
-       private final Map<Counter, String> counters = new HashMap<>();
-
-       // 
------------------------------------------------------------------------
-
-       protected ScheduledDropwizardReporter() {
-               this.registry = new MetricRegistry();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  life cycle
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void open(Configuration config) {
-               this.reporter = getReporter(config);
-       }
-
-       @Override
-       public void close() {
-               this.reporter.stop();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  adding / removing metrics
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void notifyOfAddedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
-               final String fullName = group.getScopeString() + '.' + 
metricName;
-
-               synchronized (this) {
-                       if (metric instanceof Counter) {
-                               counters.put((Counter) metric, fullName);
-                               registry.register(fullName, new 
CounterWrapper((Counter) metric));
-                       }
-                       else if (metric instanceof Gauge) {
-                               gauges.put((Gauge<?>) metric, fullName);
-                               registry.register(fullName, 
GaugeWrapper.fromGauge((Gauge<?>) metric));
-                       }
-               }
-       }
-
-       @Override
-       public void notifyOfRemovedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
-               synchronized (this) {
-                       String fullName;
-                       
-                       if (metric instanceof Counter) {
-                               fullName = counters.remove(metric);
-                       } else if (metric instanceof Gauge) {
-                               fullName = gauges.remove(metric);
-                       } else {
-                               fullName = null;
-                       }
-                       
-                       if (fullName != null) {
-                               registry.remove(fullName);
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  scheduled reporting
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void report() {
-               // we do not need to lock here, because the dropwizard registry 
is
-               // internally a concurrent map
-               @SuppressWarnings("rawtypes")
-               final SortedMap<String, com.codahale.metrics.Gauge> gauges = 
registry.getGauges();
-               final SortedMap<String, com.codahale.metrics.Counter> counters 
= registry.getCounters();
-               final SortedMap<String, com.codahale.metrics.Histogram> 
histograms = registry.getHistograms();
-               final SortedMap<String, com.codahale.metrics.Meter> meters = 
registry.getMeters();
-               final SortedMap<String, com.codahale.metrics.Timer> timers = 
registry.getTimers();
-
-               this.reporter.report(gauges, counters, histograms, meters, 
timers);
-       }
-
-       public abstract ScheduledReporter getReporter(Configuration config);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
 
b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
deleted file mode 100644
index f6630b9..0000000
--- 
a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.dropwizard.metrics;
-
-import org.apache.flink.metrics.Counter;
-
-public class CounterWrapper extends com.codahale.metrics.Counter {
-       private final Counter counter;
-
-       public CounterWrapper(Counter counter) {
-               this.counter = counter;
-       }
-
-       @Override
-       public long getCount() {
-               return this.counter.getCount();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
 
b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
deleted file mode 100644
index 655cd60..0000000
--- 
a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.dropwizard.metrics;
-
-import org.apache.flink.metrics.Gauge;
-
-public class GaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
-       
-       private final Gauge<T> gauge;
-
-       public GaugeWrapper(Gauge<T> gauge) {
-               this.gauge = gauge;
-       }
-
-       @Override
-       public T getValue() {
-               return this.gauge.getValue();
-       }
-
-       public static <T> GaugeWrapper<T> fromGauge(Gauge<?> gauge) {
-               @SuppressWarnings("unchecked")
-               Gauge<T> typedGauge = (Gauge<T>) gauge;
-               return new GaugeWrapper<>(typedGauge); 
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-ganglia/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-ganglia/pom.xml 
b/flink-metric-reporters/flink-metrics-ganglia/pom.xml
deleted file mode 100644
index a457ca1..0000000
--- a/flink-metric-reporters/flink-metrics-ganglia/pom.xml
+++ /dev/null
@@ -1,90 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-metric-reporters</artifactId>
-               <version>1.1-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-metrics-ganglia</artifactId>
-       <name>flink-metrics-ganglia</name>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-metrics-dropwizard</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>info.ganglia.gmetric4j</groupId>
-                       <artifactId>gmetric4j</artifactId>
-                       <version>1.0.7</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>io.dropwizard.metrics</groupId>
-                       <artifactId>metrics-core</artifactId>
-                       <version>${metrics.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>io.dropwizard.metrics</groupId>
-                       <artifactId>metrics-ganglia</artifactId>
-                       <version>${metrics.version}</version>
-               </dependency>
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <artifactId>maven-assembly-plugin</artifactId>
-                               <version>2.4</version>
-                               <configuration>
-                                       <descriptorRefs>
-                                               
<descriptorRef>jar-with-dependencies</descriptorRef>
-                                       </descriptorRefs>
-                               </configuration>
-                               <executions>
-                                       <execution>
-                                               <id>make-assembly</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>single</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
 
b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
deleted file mode 100644
index adf9394..0000000
--- 
a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.ganglia;
-
-import com.codahale.metrics.ScheduledReporter;
-
-import info.ganglia.gmetric4j.gmetric.GMetric;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-@PublicEvolving
-public class GangliaReporter extends ScheduledDropwizardReporter {
-       
-       public static final String ARG_DMAX = "dmax";
-       public static final String ARG_TMAX = "tmax";
-       public static final String ARG_TTL = "ttl";
-       public static final String ARG_MODE_ADDRESSING = "addressingMode";
-
-       @Override
-       public ScheduledReporter getReporter(Configuration config) {
-
-               try {
-                       String host = config.getString(ARG_HOST, null);
-                       int port = config.getInteger(ARG_PORT, -1);
-                       if (host == null || host.length() == 0 || port < 1) {
-                               throw new IllegalArgumentException("Invalid 
host/port configuration. Host: " + host + " Port: " + port);
-                       }
-                       String addressingMode = 
config.getString(ARG_MODE_ADDRESSING, "MULTICAST");
-                       int ttl = config.getInteger(ARG_TTL, -1);
-                       GMetric gMetric = new GMetric(host, port, 
GMetric.UDPAddressingMode.valueOf(addressingMode), ttl);
-
-                       String prefix = config.getString(ARG_PREFIX, null);
-                       String conversionRate = 
config.getString(ARG_CONVERSION_RATE, null);
-                       String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, null);
-                       int dMax = config.getInteger(ARG_DMAX, 0);
-                       int tMax = config.getInteger(ARG_TMAX, 60);
-
-                       com.codahale.metrics.ganglia.GangliaReporter.Builder 
builder =
-                               
com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry);
-
-                       if (prefix != null) {
-                               builder.prefixedWith(prefix);
-                       }
-                       if (conversionRate != null) {
-                               
builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
-                       }
-                       if (conversionDuration != null) {
-                               
builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
-                       }
-                       builder.withDMax(dMax);
-                       builder.withTMax(tMax);
-
-                       return builder.build(gMetric);
-               } catch (IOException e) {
-                       throw new RuntimeException("Error while instantiating 
GangliaReporter.", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-graphite/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-graphite/pom.xml 
b/flink-metric-reporters/flink-metrics-graphite/pom.xml
deleted file mode 100644
index 714b77f..0000000
--- a/flink-metric-reporters/flink-metrics-graphite/pom.xml
+++ /dev/null
@@ -1,84 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-metric-reporters</artifactId>
-               <version>1.1-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-metrics-graphite</artifactId>
-       <name>flink-metrics-graphite</name>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-metrics-dropwizard</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>io.dropwizard.metrics</groupId>
-                       <artifactId>metrics-core</artifactId>
-                       <version>${metrics.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>io.dropwizard.metrics</groupId>
-                       <artifactId>metrics-graphite</artifactId>
-                       <version>${metrics.version}</version>
-               </dependency>
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <artifactId>maven-assembly-plugin</artifactId>
-                               <version>2.4</version>
-                               <configuration>
-                                       <descriptorRefs>
-                                               
<descriptorRef>jar-with-dependencies</descriptorRef>
-                                       </descriptorRefs>
-                               </configuration>
-                               <executions>
-                                       <execution>
-                                               <id>make-assembly</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>single</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
 
b/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
deleted file mode 100644
index 16be830..0000000
--- 
a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.graphite;
-
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.graphite.Graphite;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
-
-import java.util.concurrent.TimeUnit;
-
-@PublicEvolving
-public class GraphiteReporter extends ScheduledDropwizardReporter {
-
-       @Override
-       public ScheduledReporter getReporter(Configuration config) {
-               String host = config.getString(ARG_HOST, null);
-               int port = config.getInteger(ARG_PORT, -1);
-
-               if (host == null || host.length() == 0 || port < 1) {
-                       throw new IllegalArgumentException("Invalid host/port 
configuration. Host: " + host + " Port: " + port);
-               }
-
-               String prefix = config.getString(ARG_PREFIX, null);
-               String conversionRate = config.getString(ARG_CONVERSION_RATE, 
null);
-               String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, null);
-
-               com.codahale.metrics.graphite.GraphiteReporter.Builder builder =
-                       
com.codahale.metrics.graphite.GraphiteReporter.forRegistry(registry);
-
-               if (prefix != null) {
-                       builder.prefixedWith(prefix);
-               }
-
-               if (conversionRate != null) {
-                       
builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
-               }
-
-               if (conversionDuration != null) {
-                       
builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
-               }
-
-               return builder.build(new Graphite(host, port));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-statsd/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-statsd/pom.xml 
b/flink-metric-reporters/flink-metrics-statsd/pom.xml
deleted file mode 100644
index 3052a10..0000000
--- a/flink-metric-reporters/flink-metrics-statsd/pom.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-metric-reporters</artifactId>
-               <version>1.1-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-metrics-statsd</artifactId>
-       <name>flink-metrics-statsd</name>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-       </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 
b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
deleted file mode 100644
index 087a265..0000000
--- 
a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.statsd;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.reporter.AbstractReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.util.ConcurrentModificationException;
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-/**
- * Largely based on the StatsDReporter class by ReadyTalk
- * 
https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java
- *
- * Ported since it was not present in maven central.
- */
-@PublicEvolving
-public class StatsDReporter extends AbstractReporter implements Scheduled {
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(StatsDReporter.class);
-
-       public static final String ARG_HOST = "host";
-       public static final String ARG_PORT = "port";
-//     public static final String ARG_CONVERSION_RATE = "rateConversion";
-//     public static final String ARG_CONVERSION_DURATION = 
"durationConversion";
-
-       private boolean closed = false;
-
-       private DatagramSocket socket;
-       private InetSocketAddress address;
-
-       @Override
-       public void open(Configuration config) {
-               String host = config.getString(ARG_HOST, null);
-               int port = config.getInteger(ARG_PORT, -1);
-
-               if (host == null || host.length() == 0 || port < 1) {
-                       throw new IllegalArgumentException("Invalid host/port 
configuration. Host: " + host + " Port: " + port);
-               }
-
-               this.address = new InetSocketAddress(host, port);
-
-//             String conversionRate = config.getString(ARG_CONVERSION_RATE, 
"SECONDS");
-//             String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
-//             this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
-//             this.durationFactor = 1.0 / 
TimeUnit.valueOf(conversionDuration).toNanos(1);
-
-               try {
-                       this.socket = new DatagramSocket(0);
-               } catch (SocketException e) {
-                       throw new RuntimeException("Could not create datagram 
socket. ", e);
-               }
-       }
-
-       @Override
-       public void close() {
-               closed = true;
-               if (socket != null && !socket.isClosed()) {
-                       socket.close();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void report() {
-               // instead of locking here, we tolerate exceptions
-               // we do this to prevent holding the lock for very long and 
blocking
-               // operator creation and shutdown
-               try {
-                       for (Map.Entry<Gauge<?>, String> entry : 
gauges.entrySet()) {
-                               if (closed) {
-                                       return;
-                               }
-                               reportGauge(entry.getValue(), entry.getKey());
-                       }
-
-                       for (Map.Entry<Counter, String> entry : 
counters.entrySet()) {
-                               if (closed) {
-                                       return;
-                               }
-                               reportCounter(entry.getValue(), entry.getKey());
-                       }
-               }
-               catch (ConcurrentModificationException | NoSuchElementException 
e) {
-                       // ignore - may happen when metrics are concurrently 
added or removed
-                       // report next time
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       private void reportCounter(final String name, final Counter counter) {
-               send(name, String.valueOf(counter.getCount()));
-       }
-
-       private void reportGauge(final String name, final Gauge<?> gauge) {
-               Object value = gauge.getValue();
-               if (value != null) {
-                       send(name, value.toString());
-               }
-       }
-
-       private void send(final String name, final String value) {
-               try {
-                       String formatted = String.format("%s:%s|g", name, 
value);
-                       byte[] data = formatted.getBytes();
-                       socket.send(new DatagramPacket(data, data.length, 
this.address));
-               }
-               catch (IOException e) {
-                       LOG.error("unable to send packet to statsd at '{}:{}'", 
address.getHostName(), address.getPort());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/pom.xml b/flink-metric-reporters/pom.xml
deleted file mode 100644
index 01a809c..0000000
--- a/flink-metric-reporters/pom.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-parent</artifactId>
-               <version>1.1-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-metric-reporters</artifactId>
-       <name>flink-metric-reporters</name>
-       <packaging>pom</packaging>
-
-       <modules>
-               <module>flink-metrics-dropwizard</module>
-               <module>flink-metrics-ganglia</module>
-               <module>flink-metrics-graphite</module>
-               <module>flink-metrics-statsd</module>
-       </modules>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/pom.xml 
b/flink-metrics/flink-metrics-dropwizard/pom.xml
new file mode 100644
index 0000000..90dbc00
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-metrics</artifactId>
+               <version>1.1-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-metrics-dropwizard</artifactId>
+       <name>flink-metrics-dropwizard</name>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>io.dropwizard.metrics</groupId>
+                       <artifactId>metrics-core</artifactId>
+                       <version>${metrics.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <artifactId>maven-assembly-plugin</artifactId>
+                               <version>2.4</version>
+                               <configuration>
+                                       <descriptorRefs>
+                                               
<descriptorRef>jar-with-dependencies</descriptorRef>
+                                       </descriptorRefs>
+                               </configuration>
+                               <executions>
+                                       <execution>
+                                               <id>make-assembly</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>single</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
new file mode 100644
index 0000000..062bbd8
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+import com.codahale.metrics.ScheduledReporter;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper;
+import org.apache.flink.dropwizard.metrics.FlinkHistogramWrapper;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+
+/**
+ * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} 
that wraps a
+ * Dropwizard {@link com.codahale.metrics.Reporter}.
+ */
+@PublicEvolving
+public abstract class ScheduledDropwizardReporter implements MetricReporter, 
Scheduled, Reporter {
+
+       protected final Logger log = LoggerFactory.getLogger(getClass());
+
+       public static final String ARG_HOST = "host";
+       public static final String ARG_PORT = "port";
+       public static final String ARG_PREFIX = "prefix";
+       public static final String ARG_CONVERSION_RATE = "rateConversion";
+       public static final String ARG_CONVERSION_DURATION = 
"durationConversion";
+
+       // 
------------------------------------------------------------------------
+
+       protected final MetricRegistry registry;
+
+       protected ScheduledReporter reporter;
+
+       private final Map<Gauge<?>, String> gauges = new HashMap<>();
+       private final Map<Counter, String> counters = new HashMap<>();
+       private final Map<Histogram, String> histograms = new HashMap<>();
+
+       // 
------------------------------------------------------------------------
+
+       protected ScheduledDropwizardReporter() {
+               this.registry = new MetricRegistry();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  life cycle
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void open(Configuration config) {
+               this.reporter = getReporter(config);
+       }
+
+       @Override
+       public void close() {
+               this.reporter.stop();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  adding / removing metrics
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void notifyOfAddedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
+               final String fullName = group.getScopeString() + '.' + 
metricName;
+
+               synchronized (this) {
+                       if (metric instanceof Counter) {
+                               counters.put((Counter) metric, fullName);
+                               registry.register(fullName, new 
FlinkCounterWrapper((Counter) metric));
+                       }
+                       else if (metric instanceof Gauge) {
+                               gauges.put((Gauge<?>) metric, fullName);
+                               registry.register(fullName, 
FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
+                       } else if (metric instanceof Histogram) {
+                               Histogram histogram = (Histogram) metric;
+                               histograms.put(histogram, fullName);
+
+                               if (histogram instanceof 
DropwizardHistogramWrapper) {
+                                       registry.register(fullName, 
((DropwizardHistogramWrapper) histogram).getDropwizarHistogram());
+                               } else {
+                                       registry.register(fullName, new 
FlinkHistogramWrapper(histogram));
+                               }
+                       } else {
+                               log.warn("Cannot add metric of type {}. This 
indicates that the reporter " +
+                                       "does not support this metric type.", 
metric.getClass().getName());
+                       }
+               }
+       }
+
+       @Override
+       public void notifyOfRemovedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
+               synchronized (this) {
+                       String fullName;
+                       
+                       if (metric instanceof Counter) {
+                               fullName = counters.remove(metric);
+                       } else if (metric instanceof Gauge) {
+                               fullName = gauges.remove(metric);
+                       } else if (metric instanceof Histogram) {
+                               fullName = histograms.remove(metric);
+                       } else {
+                               fullName = null;
+                       }
+                       
+                       if (fullName != null) {
+                               registry.remove(fullName);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  scheduled reporting
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void report() {
+               // we do not need to lock here, because the dropwizard registry 
is
+               // internally a concurrent map
+               @SuppressWarnings("rawtypes")
+               final SortedMap<String, com.codahale.metrics.Gauge> gauges = 
registry.getGauges();
+               final SortedMap<String, com.codahale.metrics.Counter> counters 
= registry.getCounters();
+               final SortedMap<String, com.codahale.metrics.Histogram> 
histograms = registry.getHistograms();
+               final SortedMap<String, com.codahale.metrics.Meter> meters = 
registry.getMeters();
+               final SortedMap<String, com.codahale.metrics.Timer> timers = 
registry.getTimers();
+
+               this.reporter.report(gauges, counters, histograms, meters, 
timers);
+       }
+
+       public abstract ScheduledReporter getReporter(Configuration config);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
new file mode 100644
index 0000000..6f4eab2
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Snapshot;
+import org.apache.flink.metrics.HistogramStatistics;
+
+/**
+ * Dropwizard histogram statistics implementation returned by {@link 
DropwizardHistogramWrapper}.
+ * The statistics class wraps a {@link Snapshot} instance and forwards the 
method calls accordingly.
+ */
+class DropwizardHistogramStatistics extends HistogramStatistics {
+
+       private final com.codahale.metrics.Snapshot snapshot;
+
+       DropwizardHistogramStatistics(com.codahale.metrics.Snapshot snapshot) {
+               this.snapshot = snapshot;
+       }
+
+       @Override
+       public double getQuantile(double quantile) {
+               return snapshot.getValue(quantile);
+       }
+
+       @Override
+       public long[] getValues() {
+               return snapshot.getValues();
+       }
+
+       @Override
+       public int size() {
+               return snapshot.size();
+       }
+
+       @Override
+       public double getMean() {
+               return snapshot.getMean();
+       }
+
+       @Override
+       public double getStdDev() {
+               return snapshot.getStdDev();
+       }
+
+       @Override
+       public long getMax() {
+               return snapshot.getMax();
+       }
+
+       @Override
+       public long getMin() {
+               return snapshot.getMin();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
new file mode 100644
index 0000000..79a6a56
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+
+/**
+ * Wrapper to use a Dropwizard {@link com.codahale.metrics.Histogram} as a 
Flink {@link Histogram}.
+ */
+public class DropwizardHistogramWrapper implements Histogram {
+
+       private final com.codahale.metrics.Histogram dropwizarHistogram;
+
+       public DropwizardHistogramWrapper(com.codahale.metrics.Histogram 
dropwizardHistogram) {
+               this.dropwizarHistogram = dropwizardHistogram;
+       }
+
+       public com.codahale.metrics.Histogram getDropwizarHistogram() {
+               return dropwizarHistogram;
+       }
+
+       @Override
+       public void update(long value) {
+               dropwizarHistogram.update(value);
+       }
+
+       @Override
+       public long getCount() {
+               return dropwizarHistogram.getCount();
+       }
+
+       @Override
+       public HistogramStatistics getStatistics() {
+               return new 
DropwizardHistogramStatistics(dropwizarHistogram.getSnapshot());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
new file mode 100644
index 0000000..a44c3f5
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Counter;
+
+public class FlinkCounterWrapper extends com.codahale.metrics.Counter {
+       private final Counter counter;
+
+       public FlinkCounterWrapper(Counter counter) {
+               this.counter = counter;
+       }
+
+       @Override
+       public long getCount() {
+               return this.counter.getCount();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
new file mode 100644
index 0000000..058ecad
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+public class FlinkGaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
+       
+       private final Gauge<T> gauge;
+
+       public FlinkGaugeWrapper(Gauge<T> gauge) {
+               this.gauge = gauge;
+       }
+
+       @Override
+       public T getValue() {
+               return this.gauge.getValue();
+       }
+
+       public static <T> FlinkGaugeWrapper<T> fromGauge(Gauge<?> gauge) {
+               @SuppressWarnings("unchecked")
+               Gauge<T> typedGauge = (Gauge<T>) gauge;
+               return new FlinkGaugeWrapper<>(typedGauge);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
new file mode 100644
index 0000000..8bd8078
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Snapshot;
+import org.apache.flink.metrics.Histogram;
+
+/**
+ * Wrapper to use a Flink {@link Histogram} as a Dropwizard {@link 
com.codahale.metrics.Histogram}.
+ * This is necessary to report Flink's histograms via the Dropwizard
+ * {@link com.codahale.metrics.Reporter}.
+ */
+public class FlinkHistogramWrapper extends com.codahale.metrics.Histogram {
+
+       private final Histogram histogram;
+
+       public FlinkHistogramWrapper(Histogram histogram) {
+               super(null);
+               this.histogram = histogram;
+       }
+
+       @Override
+       public void update(long value) {
+               histogram.update(value);
+       }
+
+       @Override
+       public long getCount() {
+               return histogram.getCount();
+       }
+
+       @Override
+       public Snapshot getSnapshot() {
+               return new 
HistogramStatisticsWrapper(histogram.getStatistics());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
new file mode 100644
index 0000000..6d3a69b
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Snapshot;
+import org.apache.flink.metrics.HistogramStatistics;
+
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+
+/**
+ * Wrapper to use Flink's {@link HistogramStatistics} as a Dropwizard {@link 
Snapshot}. This is
+ * necessary to report Flink's histograms via the Dropwizard {@link 
com.codahale.metrics.Reporter}.
+ */
+class HistogramStatisticsWrapper extends Snapshot {
+
+       private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+       private final HistogramStatistics histogramStatistics;
+
+       HistogramStatisticsWrapper(HistogramStatistics histogramStatistics) {
+               this.histogramStatistics = histogramStatistics;
+       }
+       @Override
+       public double getValue(double quantile) {
+               return histogramStatistics.getQuantile(quantile);
+       }
+
+       @Override
+       public long[] getValues() {
+               return histogramStatistics.getValues();
+       }
+
+       @Override
+       public int size() {
+               return histogramStatistics.size();
+       }
+
+       @Override
+       public long getMax() {
+               return histogramStatistics.getMax();
+       }
+
+       @Override
+       public double getMean() {
+               return histogramStatistics.getMean();
+       }
+
+       @Override
+       public long getMin() {
+               return histogramStatistics.getMin();
+       }
+
+       @Override
+       public double getStdDev() {
+               return histogramStatistics.getStdDev();
+       }
+
+       @Override
+       public void dump(OutputStream output) {
+               try(PrintWriter printWriter = new PrintWriter(new 
OutputStreamWriter(output, UTF_8))){
+
+                       for (Long value : histogramStatistics.getValues()) {
+                               printWriter.printf("%d%n", value);
+                       }
+               }
+       }
+}

Reply via email to