Repository: flink
Updated Branches:
  refs/heads/master 8ed368582 -> 707606ac4


http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
index 7b3286d..6b9a5fc 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
@@ -17,15 +17,112 @@
  */
 package org.apache.flink.metrics.groups;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.util.AbstractID;
 import org.junit.Test;
 
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 public class TaskManagerGroupTest {
+
+       // 
------------------------------------------------------------------------
+       //  adding and removing jobs
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void addAndRemoveJobs() {
+               final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
+                               new MetricRegistry(new Configuration()), 
"localhost", new AbstractID().toString());
+               
+               
+               final JobID jid1 = new JobID();
+               final JobID jid2 = new JobID();
+               
+               final String jobName1 = "testjob";
+               final String jobName2 = "anotherJob";
+               
+               final AbstractID vertex11 = new AbstractID();
+               final AbstractID vertex12 = new AbstractID();
+               final AbstractID vertex13 = new AbstractID();
+               final AbstractID vertex21 = new AbstractID();
+
+               final AbstractID execution11 = new AbstractID();
+               final AbstractID execution12 = new AbstractID();
+               final AbstractID execution13 = new AbstractID();
+               final AbstractID execution21 = new AbstractID();
+               
+               TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, 
vertex11, execution11, 17, "test");
+               TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, 
vertex12, execution12, 13, "test");
+               TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, 
vertex21, execution21, 7, "test");
+               
+               assertEquals(2, group.numRegisteredJobMetricGroups());
+               assertFalse(tmGroup11.parent().isClosed());
+               assertFalse(tmGroup12.parent().isClosed());
+               assertFalse(tmGroup21.parent().isClosed());
+               
+               // close all for job 2 and one from job 1
+               tmGroup11.close();
+               tmGroup21.close();
+               assertTrue(tmGroup11.isClosed());
+               assertTrue(tmGroup21.isClosed());
+               
+               // job 2 should be removed, job should still be there
+               assertFalse(tmGroup11.parent().isClosed());
+               assertFalse(tmGroup12.parent().isClosed());
+               assertTrue(tmGroup21.parent().isClosed());
+               assertEquals(1, group.numRegisteredJobMetricGroups());
+               
+               // add one more to job one
+               TaskMetricGroup tmGroup13 = group.addTaskForJob(jid1, jobName1, 
vertex13, execution13, 0, "test");
+               tmGroup12.close();
+               tmGroup13.close();
+
+               assertTrue(tmGroup11.parent().isClosed());
+               assertTrue(tmGroup12.parent().isClosed());
+               assertTrue(tmGroup13.parent().isClosed());
+               
+               assertEquals(0, group.numRegisteredJobMetricGroups());
+       }
+
+       @Test
+       public void testCloseClosesAll() {
+               final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
+                               new MetricRegistry(new Configuration()), 
"localhost", new AbstractID().toString());
+
+
+               final JobID jid1 = new JobID();
+               final JobID jid2 = new JobID();
+
+               final String jobName1 = "testjob";
+               final String jobName2 = "anotherJob";
+
+               final AbstractID vertex11 = new AbstractID();
+               final AbstractID vertex12 = new AbstractID();
+               final AbstractID vertex21 = new AbstractID();
+
+               final AbstractID execution11 = new AbstractID();
+               final AbstractID execution12 = new AbstractID();
+               final AbstractID execution21 = new AbstractID();
+
+               TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, 
vertex11, execution11, 17, "test");
+               TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, 
vertex12, execution12, 13, "test");
+               TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, 
vertex21, execution21, 7, "test");
+               
+               group.close();
+               
+               assertTrue(tmGroup11.isClosed());
+               assertTrue(tmGroup12.isClosed());
+               assertTrue(tmGroup21.isClosed());
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  scope tests
+       // 
------------------------------------------------------------------------
+
        @Test
        public void testGenerateScopeDefault() {
                MetricRegistry registry = new MetricRegistry(new 
Configuration());

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java
index d607072..ab78288 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java
@@ -24,6 +24,7 @@ import org.apache.flink.metrics.groups.JobMetricGroup;
 import org.apache.flink.util.AbstractID;
 
 public class DummyJobMetricGroup extends JobMetricGroup {
+       
        public DummyJobMetricGroup() {
                super(new DummyMetricRegistry(), new 
DummyTaskManagerMetricGroup(), new JobID(), "job");
        }
@@ -34,14 +35,10 @@ public class DummyJobMetricGroup extends JobMetricGroup {
        }
 
        @Override
-       protected MetricGroup addMetric(String name, Metric metric) {
-               return this;
-       }
+       public void removeTaskMetricGroup(AbstractID executionId) {}
 
        @Override
-       public MetricGroup addGroup(int name) {
-               return addGroup("" + name);
-       }
+       protected void addMetric(String name, Metric metric) {}
 
        @Override
        public MetricGroup addGroup(String name) {

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java
index 26df874..77ddd17 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java
@@ -19,6 +19,7 @@ package org.apache.flink.metrics.util;
 
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.MetricRegistry;
 import org.apache.flink.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.metrics.groups.Scope;
 
@@ -26,8 +27,13 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class DummyMetricGroup extends AbstractMetricGroup {
+
        public DummyMetricGroup() {
-               super(new DummyMetricRegistry());
+               this(new DummyMetricRegistry());
+       }
+       
+       public DummyMetricGroup(MetricRegistry registry) {
+               super(registry);
        }
 
        @Override
@@ -41,14 +47,8 @@ public class DummyMetricGroup extends AbstractMetricGroup {
        }
 
        @Override
-       protected MetricGroup addMetric(String name, Metric metric) {
-               return this;
-       }
-
-       @Override
-       public MetricGroup addGroup(int name) {
-               return addGroup("" + name);
-       }
+       protected void addMetric(String name, Metric metric) {}
+       
 
        @Override
        public MetricGroup addGroup(String name) {

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java
index f0d6d3f..f8b73a9 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java
@@ -15,18 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.util;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricRegistry;
 
 public class DummyMetricRegistry extends MetricRegistry {
-       private static final Configuration config;
-
-       static {
-               config = new Configuration();
-               config.setString(KEY_METRICS_REPORTER_CLASS, 
DummyReporter.class.getCanonicalName());
-       }
 
        public DummyMetricRegistry() {
                super(new Configuration());

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java
index eb45f6a..e271d6e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java
@@ -22,19 +22,13 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 
 public class DummyOperatorMetricGroup extends OperatorMetricGroup {
+       
        public DummyOperatorMetricGroup() {
                super(new DummyMetricRegistry(), new DummyTaskMetricGroup(), 
"operator", 0);
        }
 
        @Override
-       protected MetricGroup addMetric(String name, Metric metric) {
-               return this;
-       }
-
-       @Override
-       public MetricGroup addGroup(int name) {
-               return addGroup("" + name);
-       }
+       protected void addMetric(String name, Metric metric) {}
 
        @Override
        public MetricGroup addGroup(String name) {

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java
index 1c7d33b..8b7714f 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java
@@ -23,6 +23,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
 
 public class DummyTaskManagerMetricGroup extends TaskManagerMetricGroup {
+       
        public DummyTaskManagerMetricGroup() {
                super(new DummyMetricRegistry(), "host", "id");
        }
@@ -32,14 +33,7 @@ public class DummyTaskManagerMetricGroup extends 
TaskManagerMetricGroup {
        }
 
        @Override
-       protected MetricGroup addMetric(String name, Metric metric) {
-               return this;
-       }
-
-       @Override
-       public MetricGroup addGroup(int name) {
-               return addGroup("" + name);
-       }
+       protected void addMetric(String name, Metric metric) {}
 
        @Override
        public MetricGroup addGroup(String name) {

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java
index 53683f4..db2c557 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java
@@ -23,6 +23,7 @@ import org.apache.flink.metrics.groups.TaskMetricGroup;
 import org.apache.flink.util.AbstractID;
 
 public class DummyTaskMetricGroup extends TaskMetricGroup {
+       
        public DummyTaskMetricGroup() {
                super(new DummyMetricRegistry(), new DummyJobMetricGroup(), new 
AbstractID(), new AbstractID(), 0, "task");
        }
@@ -32,14 +33,7 @@ public class DummyTaskMetricGroup extends TaskMetricGroup {
        }
 
        @Override
-       protected MetricGroup addMetric(String name, Metric metric) {
-               return this;
-       }
-
-       @Override
-       public MetricGroup addGroup(int name) {
-               return addGroup("" + name);
-       }
+       protected void addMetric(String name, Metric metric) {}
 
        @Override
        public MetricGroup addGroup(String name) {

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java 
b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
index 482d1e8..5d8a8e0 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
@@ -15,23 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.util;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.reporter.AbstractReporter;
-import org.apache.flink.metrics.reporter.MetricReporter;
 
 import java.util.List;
 
 public class TestReporter extends AbstractReporter {
+
        @Override
-       public void open(Configuration config) {
-       }
+       public void open(Configuration config) {}
 
        @Override
-       public void close() {
-       }
+       public void close() {}
 
        @Override
        public String generateName(String name, List<String> scope) {

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-dropwizard/pom.xml 
b/flink-metric-reporters/flink-metrics-dropwizard/pom.xml
new file mode 100644
index 0000000..a386880
--- /dev/null
+++ b/flink-metric-reporters/flink-metrics-dropwizard/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-metric-reporters</artifactId>
+               <version>1.1-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-metrics-dropwizard</artifactId>
+       <name>flink-metrics-dropwizard</name>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>io.dropwizard.metrics</groupId>
+                       <artifactId>metrics-core</artifactId>
+                       <version>${metrics.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <artifactId>maven-assembly-plugin</artifactId>
+                               <version>2.4</version>
+                               <configuration>
+                                       <descriptorRefs>
+                                               
<descriptorRef>jar-with-dependencies</descriptorRef>
+                                       </descriptorRefs>
+                               </configuration>
+                               <executions>
+                                       <execution>
+                                               <id>make-assembly</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>single</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
 
b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
new file mode 100644
index 0000000..059704d
--- /dev/null
+++ 
b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.dropwizard;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.metrics.CounterWrapper;
+import org.apache.flink.dropwizard.metrics.GaugeWrapper;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+
+import java.util.List;
+
+/**
+ * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} 
that wraps a
+ * Dropwizard {@link com.codahale.metrics.Reporter}.
+ */
+@PublicEvolving
+public abstract class ScheduledDropwizardReporter implements MetricReporter, 
Scheduled {
+       protected MetricRegistry registry;
+       protected ScheduledReporter reporter;
+
+       public static final String ARG_HOST = "host";
+       public static final String ARG_PORT = "port";
+       public static final String ARG_PREFIX = "prefix";
+       public static final String ARG_CONVERSION_RATE = "rateConversion";
+       public static final String ARG_CONVERSION_DURATION = 
"durationConversion";
+
+       protected ScheduledDropwizardReporter() {
+               this.registry = new MetricRegistry();
+       }
+
+       @Override
+       public synchronized void notifyOfAddedMetric(Metric metric, String 
name) {
+               if (metric instanceof Counter) {
+                       registry.register(name, new CounterWrapper((Counter) 
metric));
+               } else if (metric instanceof Gauge) {
+                       registry.register(name, new GaugeWrapper((Gauge<?>) 
metric));
+               }
+       }
+
+       @Override
+       public synchronized void notifyOfRemovedMetric(Metric metric, String 
name) {
+               registry.remove(name);
+       }
+
+       public abstract ScheduledReporter getReporter(Configuration config);
+
+       @Override
+       public void open(Configuration config) {
+               this.reporter = getReporter(config);
+       }
+
+       @Override
+       public void close() {
+               this.reporter.stop();
+       }
+
+       @Override
+       public String generateName(String name, List<String> scope) {
+               StringBuilder sb = new StringBuilder();
+               for (String s : scope) {
+                       sb.append(s);
+                       sb.append('.');
+               }
+               sb.append(name);
+               return sb.toString();
+       }
+
+       @Override
+       public synchronized void report() {
+               this.reporter.report(
+                       this.registry.getGauges(),
+                       this.registry.getCounters(),
+                       this.registry.getHistograms(),
+                       this.registry.getMeters(),
+                       this.registry.getTimers());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
 
b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
new file mode 100644
index 0000000..f6630b9
--- /dev/null
+++ 
b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Counter;
+
+public class CounterWrapper extends com.codahale.metrics.Counter {
+       private final Counter counter;
+
+       public CounterWrapper(Counter counter) {
+               this.counter = counter;
+       }
+
+       @Override
+       public long getCount() {
+               return this.counter.getCount();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
 
b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
new file mode 100644
index 0000000..fcb629a
--- /dev/null
+++ 
b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+public class GaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
+       private final Gauge<T> gauge;
+
+       public GaugeWrapper(Gauge<T> gauge) {
+               this.gauge = gauge;
+       }
+
+       @Override
+       public T getValue() {
+               return this.gauge.getValue();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-ganglia/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-ganglia/pom.xml 
b/flink-metric-reporters/flink-metrics-ganglia/pom.xml
new file mode 100644
index 0000000..a457ca1
--- /dev/null
+++ b/flink-metric-reporters/flink-metrics-ganglia/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-metric-reporters</artifactId>
+               <version>1.1-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-metrics-ganglia</artifactId>
+       <name>flink-metrics-ganglia</name>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-dropwizard</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>info.ganglia.gmetric4j</groupId>
+                       <artifactId>gmetric4j</artifactId>
+                       <version>1.0.7</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>io.dropwizard.metrics</groupId>
+                       <artifactId>metrics-core</artifactId>
+                       <version>${metrics.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>io.dropwizard.metrics</groupId>
+                       <artifactId>metrics-ganglia</artifactId>
+                       <version>${metrics.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <artifactId>maven-assembly-plugin</artifactId>
+                               <version>2.4</version>
+                               <configuration>
+                                       <descriptorRefs>
+                                               
<descriptorRef>jar-with-dependencies</descriptorRef>
+                                       </descriptorRefs>
+                               </configuration>
+                               <executions>
+                                       <execution>
+                                               <id>make-assembly</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>single</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java
 
b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java
new file mode 100644
index 0000000..a1dafc9
--- /dev/null
+++ 
b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.graphite;
+
+import com.codahale.metrics.ScheduledReporter;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class GangliaReporter extends ScheduledDropwizardReporter {
+       public static final String ARG_DMAX = "dmax";
+       public static final String ARG_TMAX = "tmax";
+       public static final String ARG_TTL = "ttl";
+       public static final String ARG_MODE_ADDRESSING = "addressingMode";
+
+       @Override
+       public ScheduledReporter getReporter(Configuration config) {
+
+               try {
+                       String host = config.getString(ARG_HOST, null);
+                       int port = config.getInteger(ARG_PORT, -1);
+                       if (host == null || host.length() == 0 || port < 1) {
+                               throw new IllegalArgumentException("Invalid 
host/port configuration. Host: " + host + " Port: " + port);
+                       }
+                       String addressingMode = 
config.getString(ARG_MODE_ADDRESSING, "MULTICAST");
+                       int ttl = config.getInteger(ARG_TTL, -1);
+                       GMetric gMetric = new GMetric(host, port, 
GMetric.UDPAddressingMode.valueOf(addressingMode), ttl);
+
+                       String prefix = config.getString(ARG_PREFIX, null);
+                       String conversionRate = 
config.getString(ARG_CONVERSION_RATE, null);
+                       String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, null);
+                       int dMax = config.getInteger(ARG_DMAX, 0);
+                       int tMax = config.getInteger(ARG_TMAX, 60);
+
+                       com.codahale.metrics.ganglia.GangliaReporter.Builder 
builder =
+                               
com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry);
+
+                       if (prefix != null) {
+                               builder.prefixedWith(prefix);
+                       }
+                       if (conversionRate != null) {
+                               
builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
+                       }
+                       if (conversionDuration != null) {
+                               
builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
+                       }
+                       builder.withDMax(dMax);
+                       builder.withTMax(tMax);
+
+                       return builder.build(gMetric);
+               } catch (IOException e) {
+                       throw new RuntimeException("Error while instantiating 
GangliaReporter.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-graphite/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-graphite/pom.xml 
b/flink-metric-reporters/flink-metrics-graphite/pom.xml
new file mode 100644
index 0000000..714b77f
--- /dev/null
+++ b/flink-metric-reporters/flink-metrics-graphite/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-metric-reporters</artifactId>
+               <version>1.1-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-metrics-graphite</artifactId>
+       <name>flink-metrics-graphite</name>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-dropwizard</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>io.dropwizard.metrics</groupId>
+                       <artifactId>metrics-core</artifactId>
+                       <version>${metrics.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>io.dropwizard.metrics</groupId>
+                       <artifactId>metrics-graphite</artifactId>
+                       <version>${metrics.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <artifactId>maven-assembly-plugin</artifactId>
+                               <version>2.4</version>
+                               <configuration>
+                                       <descriptorRefs>
+                                               
<descriptorRef>jar-with-dependencies</descriptorRef>
+                                       </descriptorRefs>
+                               </configuration>
+                               <executions>
+                                       <execution>
+                                               <id>make-assembly</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>single</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
 
b/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
new file mode 100644
index 0000000..b28d7a4
--- /dev/null
+++ 
b/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.graphite;
+
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.graphite.Graphite;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class GraphiteReporter extends ScheduledDropwizardReporter {
+       @Override
+       public ScheduledReporter getReporter(Configuration config) {
+               String host = config.getString(ARG_HOST, null);
+               int port = config.getInteger(ARG_PORT, -1);
+
+               if (host == null || host.length() == 0 || port < 1) {
+                       throw new IllegalArgumentException("Invalid host/port 
configuration. Host: " + host + " Port: " + port);
+               }
+
+               String prefix = config.getString(ARG_PREFIX, null);
+               String conversionRate = config.getString(ARG_CONVERSION_RATE, 
null);
+               String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, null);
+
+               com.codahale.metrics.graphite.GraphiteReporter.Builder builder =
+                       
com.codahale.metrics.graphite.GraphiteReporter.forRegistry(registry);
+
+               if (prefix != null) {
+                       builder.prefixedWith(prefix);
+               }
+
+               if (conversionRate != null) {
+                       
builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
+               }
+
+               if (conversionDuration != null) {
+                       
builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
+               }
+
+               return builder.build(new Graphite(host, port));
+       }
+
+       @Override
+       public String generateName(String name, List<String> scope) {
+               StringBuilder sb = new StringBuilder();
+               for (String s : scope) {
+                       sb.append(s.replace(".", "_").replace("\"", ""));
+                       sb.append(".");
+               }
+               sb.append(name);
+               return sb.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-statsd/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-statsd/pom.xml 
b/flink-metric-reporters/flink-metrics-statsd/pom.xml
new file mode 100644
index 0000000..3052a10
--- /dev/null
+++ b/flink-metric-reporters/flink-metrics-statsd/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-metric-reporters</artifactId>
+               <version>1.1-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-metrics-statsd</artifactId>
+       <name>flink-metrics-statsd</name>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 
b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
new file mode 100644
index 0000000..e57001f
--- /dev/null
+++ 
b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.statsd;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Largely based on the StatsDReporter class by ReadyTalk
+ * 
https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java
+ *
+ * Ported since it was not present in maven central.
+ */
+public class StatsDReporter extends AbstractReporter implements Scheduled {
+       private static final Logger LOG = 
LoggerFactory.getLogger(StatsDReporter.class);
+
+       public static final String ARG_HOST = "host";
+       public static final String ARG_PORT = "port";
+       public static final String ARG_CONVERSION_RATE = "rateConversion";
+       public static final String ARG_CONVERSION_DURATION = 
"durationConversion";
+
+       private DatagramSocket socket;
+       private InetSocketAddress address;
+
+       private double durationFactor;
+       private double rateFactor;
+
+       @Override
+       public void open(Configuration config) {
+               String host = config.getString(ARG_HOST, null);
+               int port = config.getInteger(ARG_PORT, -1);
+
+               if (host == null || host.length() == 0 || port < 1) {
+                       throw new IllegalArgumentException("Invalid host/port 
configuration. Host: " + host + " Port: " + port);
+               }
+
+               String conversionRate = config.getString(ARG_CONVERSION_RATE, 
"SECONDS");
+               String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
+
+               this.address = new InetSocketAddress(host, port);
+               this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
+               this.durationFactor = 1.0 / 
TimeUnit.valueOf(conversionDuration).toNanos(1);
+               try {
+                       this.socket = new DatagramSocket(0);
+               } catch (SocketException e) {
+                       throw new RuntimeException("Failure while creating 
socket. ", e);
+               }
+       }
+
+       @Override
+       public void close() {
+               if (socket != null && !socket.isClosed()) {
+                       socket.close();
+               }
+       }
+
+       @Override
+       public String generateName(String name, List<String> scope) {
+               StringBuilder sb = new StringBuilder();
+               for (String s : scope) {
+                       sb.append(s);
+                       sb.append('.');
+               }
+               sb.append(name);
+               return sb.toString();
+       }
+
+       public void send(final String name, final double value) {
+               send(name, "" + value);
+       }
+
+       public void send(final String name, final String value) {
+               try {
+                       String formatted = String.format("%s:%s|g", name, 
value);
+                       byte[] data = formatted.getBytes();
+                       socket.send(new DatagramPacket(data, data.length, 
this.address));
+               } catch (IOException e) {
+                       LOG.error("unable to send packet to statsd at '{}:{}'", 
address.getHostName(), address.getPort());
+               }
+       }
+
+       @Override
+       public void report() {
+               for (Map.Entry<String, Gauge<?>> entry : gauges.entrySet()) {
+                       reportGauge(entry.getKey(), entry.getValue());
+               }
+
+               for (Map.Entry<String, Counter> entry : counters.entrySet()) {
+                       reportCounter(entry.getKey(), entry.getValue());
+               }
+       }
+
+       private void reportCounter(final String name, final Counter counter) {
+               send(name, counter.getCount());
+       }
+
+       private void reportGauge(final String name, final Gauge<?> gauge) {
+               final String value = gauge.getValue().toString();
+               if (value != null) {
+                       send((name), value);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/pom.xml b/flink-metric-reporters/pom.xml
new file mode 100644
index 0000000..01a809c
--- /dev/null
+++ b/flink-metric-reporters/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>1.1-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-metric-reporters</artifactId>
+       <name>flink-metric-reporters</name>
+       <packaging>pom</packaging>
+
+       <modules>
+               <module>flink-metrics-dropwizard</module>
+               <module>flink-metrics-ganglia</module>
+               <module>flink-metrics-graphite</module>
+               <module>flink-metrics-statsd</module>
+       </modules>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/pom.xml 
b/flink-metrics/flink-metrics-dropwizard/pom.xml
deleted file mode 100644
index 84d9722..0000000
--- a/flink-metrics/flink-metrics-dropwizard/pom.xml
+++ /dev/null
@@ -1,72 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-metrics</artifactId>
-               <version>1.1-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-metrics-dropwizard</artifactId>
-       <name>flink-metrics-dropwizard</name>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>io.dropwizard.metrics</groupId>
-                       <artifactId>metrics-core</artifactId>
-                       <version>${metrics.version}</version>
-               </dependency>
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <artifactId>maven-assembly-plugin</artifactId>
-                               <version>2.4</version>
-                               <configuration>
-                                       <descriptorRefs>
-                                               
<descriptorRef>jar-with-dependencies</descriptorRef>
-                                       </descriptorRefs>
-                               </configuration>
-                               <executions>
-                                       <execution>
-                                               <id>make-assembly</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>single</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
deleted file mode 100644
index a7309be..0000000
--- 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.dropwizard;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.dropwizard.metrics.CounterWrapper;
-import org.apache.flink.dropwizard.metrics.GaugeWrapper;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-
-import java.util.List;
-
-/**
- * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} 
that wraps a
- * Dropwizard {@link com.codahale.metrics.Reporter}.
- */
-@PublicEvolving
-public abstract class ScheduledDropwizardReporter implements MetricReporter, 
Scheduled {
-       protected MetricRegistry registry;
-       protected ScheduledReporter reporter;
-
-       public static final String ARG_HOST = "host";
-       public static final String ARG_PORT = "port";
-       public static final String ARG_PREFIX = "prefix";
-       public static final String ARG_CONVERSION_RATE = "rateConversion";
-       public static final String ARG_CONVERSION_DURATION = 
"durationConversion";
-
-       protected ScheduledDropwizardReporter() {
-               this.registry = new MetricRegistry();
-       }
-
-       @Override
-       public synchronized void notifyOfAddedMetric(Metric metric, String 
name) {
-               if (metric instanceof Counter) {
-                       registry.register(name, new CounterWrapper((Counter) 
metric));
-               } else if (metric instanceof Gauge) {
-                       registry.register(name, new GaugeWrapper((Gauge) 
metric));
-               }
-       }
-
-       @Override
-       public synchronized void notifyOfRemovedMetric(Metric metric, String 
name) {
-               registry.remove(name);
-       }
-
-       public abstract ScheduledReporter getReporter(Configuration config);
-
-       @Override
-       public void open(Configuration config) {
-               this.reporter = getReporter(config);
-       }
-
-       @Override
-       public void close() {
-               this.reporter.stop();
-       }
-
-       @Override
-       public String generateName(String name, List<String> scope) {
-               StringBuilder sb = new StringBuilder();
-               for (String s : scope) {
-                       sb.append(s);
-                       sb.append('.');
-               }
-               sb.append(name);
-               return sb.toString();
-       }
-
-       @Override
-       public synchronized void report() {
-               this.reporter.report(
-                       this.registry.getGauges(),
-                       this.registry.getCounters(),
-                       this.registry.getHistograms(),
-                       this.registry.getMeters(),
-                       this.registry.getTimers());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
deleted file mode 100644
index f6630b9..0000000
--- 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.dropwizard.metrics;
-
-import org.apache.flink.metrics.Counter;
-
-public class CounterWrapper extends com.codahale.metrics.Counter {
-       private final Counter counter;
-
-       public CounterWrapper(Counter counter) {
-               this.counter = counter;
-       }
-
-       @Override
-       public long getCount() {
-               return this.counter.getCount();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
deleted file mode 100644
index d47090d..0000000
--- 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.dropwizard.metrics;
-
-import org.apache.flink.metrics.Gauge;
-
-public class GaugeWrapper implements com.codahale.metrics.Gauge {
-       private final Gauge gauge;
-
-       public GaugeWrapper(Gauge gauge) {
-               this.gauge = gauge;
-       }
-
-       @Override
-       public Object getValue() {
-               return this.gauge.getValue();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-ganglia/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-ganglia/pom.xml 
b/flink-metrics/flink-metrics-ganglia/pom.xml
deleted file mode 100644
index c4f51da..0000000
--- a/flink-metrics/flink-metrics-ganglia/pom.xml
+++ /dev/null
@@ -1,90 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-metrics</artifactId>
-               <version>1.1-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-metrics-ganglia</artifactId>
-       <name>flink-metrics-ganglia</name>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-metrics-dropwizard</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>info.ganglia.gmetric4j</groupId>
-                       <artifactId>gmetric4j</artifactId>
-                       <version>1.0.7</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>io.dropwizard.metrics</groupId>
-                       <artifactId>metrics-core</artifactId>
-                       <version>${metrics.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>io.dropwizard.metrics</groupId>
-                       <artifactId>metrics-ganglia</artifactId>
-                       <version>${metrics.version}</version>
-               </dependency>
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <artifactId>maven-assembly-plugin</artifactId>
-                               <version>2.4</version>
-                               <configuration>
-                                       <descriptorRefs>
-                                               
<descriptorRef>jar-with-dependencies</descriptorRef>
-                                       </descriptorRefs>
-                               </configuration>
-                               <executions>
-                                       <execution>
-                                               <id>make-assembly</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>single</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java
 
b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java
deleted file mode 100644
index a1dafc9..0000000
--- 
a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.graphite;
-
-import com.codahale.metrics.ScheduledReporter;
-import info.ganglia.gmetric4j.gmetric.GMetric;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-public class GangliaReporter extends ScheduledDropwizardReporter {
-       public static final String ARG_DMAX = "dmax";
-       public static final String ARG_TMAX = "tmax";
-       public static final String ARG_TTL = "ttl";
-       public static final String ARG_MODE_ADDRESSING = "addressingMode";
-
-       @Override
-       public ScheduledReporter getReporter(Configuration config) {
-
-               try {
-                       String host = config.getString(ARG_HOST, null);
-                       int port = config.getInteger(ARG_PORT, -1);
-                       if (host == null || host.length() == 0 || port < 1) {
-                               throw new IllegalArgumentException("Invalid 
host/port configuration. Host: " + host + " Port: " + port);
-                       }
-                       String addressingMode = 
config.getString(ARG_MODE_ADDRESSING, "MULTICAST");
-                       int ttl = config.getInteger(ARG_TTL, -1);
-                       GMetric gMetric = new GMetric(host, port, 
GMetric.UDPAddressingMode.valueOf(addressingMode), ttl);
-
-                       String prefix = config.getString(ARG_PREFIX, null);
-                       String conversionRate = 
config.getString(ARG_CONVERSION_RATE, null);
-                       String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, null);
-                       int dMax = config.getInteger(ARG_DMAX, 0);
-                       int tMax = config.getInteger(ARG_TMAX, 60);
-
-                       com.codahale.metrics.ganglia.GangliaReporter.Builder 
builder =
-                               
com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry);
-
-                       if (prefix != null) {
-                               builder.prefixedWith(prefix);
-                       }
-                       if (conversionRate != null) {
-                               
builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
-                       }
-                       if (conversionDuration != null) {
-                               
builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
-                       }
-                       builder.withDMax(dMax);
-                       builder.withTMax(tMax);
-
-                       return builder.build(gMetric);
-               } catch (IOException e) {
-                       throw new RuntimeException("Error while instantiating 
GangliaReporter.", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-graphite/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/pom.xml 
b/flink-metrics/flink-metrics-graphite/pom.xml
deleted file mode 100644
index 45fb018..0000000
--- a/flink-metrics/flink-metrics-graphite/pom.xml
+++ /dev/null
@@ -1,84 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-metrics</artifactId>
-               <version>1.1-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-metrics-graphite</artifactId>
-       <name>flink-metrics-graphite</name>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-metrics-dropwizard</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>io.dropwizard.metrics</groupId>
-                       <artifactId>metrics-core</artifactId>
-                       <version>${metrics.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>io.dropwizard.metrics</groupId>
-                       <artifactId>metrics-graphite</artifactId>
-                       <version>${metrics.version}</version>
-               </dependency>
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <artifactId>maven-assembly-plugin</artifactId>
-                               <version>2.4</version>
-                               <configuration>
-                                       <descriptorRefs>
-                                               
<descriptorRef>jar-with-dependencies</descriptorRef>
-                                       </descriptorRefs>
-                               </configuration>
-                               <executions>
-                                       <execution>
-                                               <id>make-assembly</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>single</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
 
b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
deleted file mode 100644
index b28d7a4..0000000
--- 
a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.graphite;
-
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.graphite.Graphite;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class GraphiteReporter extends ScheduledDropwizardReporter {
-       @Override
-       public ScheduledReporter getReporter(Configuration config) {
-               String host = config.getString(ARG_HOST, null);
-               int port = config.getInteger(ARG_PORT, -1);
-
-               if (host == null || host.length() == 0 || port < 1) {
-                       throw new IllegalArgumentException("Invalid host/port 
configuration. Host: " + host + " Port: " + port);
-               }
-
-               String prefix = config.getString(ARG_PREFIX, null);
-               String conversionRate = config.getString(ARG_CONVERSION_RATE, 
null);
-               String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, null);
-
-               com.codahale.metrics.graphite.GraphiteReporter.Builder builder =
-                       
com.codahale.metrics.graphite.GraphiteReporter.forRegistry(registry);
-
-               if (prefix != null) {
-                       builder.prefixedWith(prefix);
-               }
-
-               if (conversionRate != null) {
-                       
builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
-               }
-
-               if (conversionDuration != null) {
-                       
builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
-               }
-
-               return builder.build(new Graphite(host, port));
-       }
-
-       @Override
-       public String generateName(String name, List<String> scope) {
-               StringBuilder sb = new StringBuilder();
-               for (String s : scope) {
-                       sb.append(s.replace(".", "_").replace("\"", ""));
-                       sb.append(".");
-               }
-               sb.append(name);
-               return sb.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-statsd/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/pom.xml 
b/flink-metrics/flink-metrics-statsd/pom.xml
deleted file mode 100644
index 5d63908..0000000
--- a/flink-metrics/flink-metrics-statsd/pom.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-metrics</artifactId>
-               <version>1.1-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-metrics-statsd</artifactId>
-       <name>flink-metrics-statsd</name>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-       </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 
b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
deleted file mode 100644
index 288b8b8..0000000
--- 
a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.statsd;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.reporter.AbstractReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Largely based on the StatsDReporter class by ReadyTalk
- * 
https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java
- *
- * Ported since it was not present in maven central.
- */
-public class StatsDReporter extends AbstractReporter implements Scheduled {
-       private static final Logger LOG = 
LoggerFactory.getLogger(StatsDReporter.class);
-
-       public static final String ARG_HOST = "host";
-       public static final String ARG_PORT = "port";
-       public static final String ARG_CONVERSION_RATE = "rateConversion";
-       public static final String ARG_CONVERSION_DURATION = 
"durationConversion";
-
-       private DatagramSocket socket;
-       private InetSocketAddress address;
-
-       private double durationFactor;
-       private double rateFactor;
-
-       @Override
-       public void open(Configuration config) {
-               String host = config.getString(ARG_HOST, null);
-               int port = config.getInteger(ARG_PORT, -1);
-
-               if (host == null || host.length() == 0 || port < 1) {
-                       throw new IllegalArgumentException("Invalid host/port 
configuration. Host: " + host + " Port: " + port);
-               }
-
-               String conversionRate = config.getString(ARG_CONVERSION_RATE, 
"SECONDS");
-               String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
-
-               this.address = new InetSocketAddress(host, port);
-               this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
-               this.durationFactor = 1.0 / 
TimeUnit.valueOf(conversionDuration).toNanos(1);
-               try {
-                       this.socket = new DatagramSocket(0);
-               } catch (SocketException e) {
-                       throw new RuntimeException("Failure while creating 
socket. ", e);
-               }
-       }
-
-       @Override
-       public void close() {
-               if (socket != null && !socket.isClosed()) {
-                       socket.close();
-               }
-       }
-
-       @Override
-       public String generateName(String name, List<String> scope) {
-               StringBuilder sb = new StringBuilder();
-               for (String s : scope) {
-                       sb.append(s);
-                       sb.append('.');
-               }
-               sb.append(name);
-               return sb.toString();
-       }
-
-       public void send(final String name, final double value) {
-               send(name, "" + value);
-       }
-
-       public void send(final String name, final String value) {
-               try {
-                       String formatted = String.format("%s:%s|g", name, 
value);
-                       byte[] data = formatted.getBytes();
-                       socket.send(new DatagramPacket(data, data.length, 
this.address));
-               } catch (IOException e) {
-                       LOG.error("unable to send packet to statsd at '{}:{}'", 
address.getHostName(), address.getPort());
-               }
-       }
-
-       @Override
-       public void report() {
-               for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
-                       reportGauge(entry.getKey(), entry.getValue());
-               }
-
-               for (Map.Entry<String, Counter> entry : counters.entrySet()) {
-                       reportCounter(entry.getKey(), entry.getValue());
-               }
-       }
-
-       private void reportCounter(final String name, final Counter counter) {
-               send(name, counter.getCount());
-       }
-
-       private void reportGauge(final String name, final Gauge<?> gauge) {
-               final String value = gauge.getValue().toString();
-               if (value != null) {
-                       send((name), value);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
deleted file mode 100644
index 542f49c..0000000
--- a/flink-metrics/pom.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-parent</artifactId>
-               <version>1.1-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-metrics</artifactId>
-       <name>flink-metrics</name>
-       <packaging>pom</packaging>
-
-       <modules>
-               <module>flink-metrics-dropwizard</module>
-               <module>flink-metrics-ganglia</module>
-               <module>flink-metrics-graphite</module>
-               <module>flink-metrics-statsd</module>
-       </modules>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index bc3ba0d..1f766e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -60,6 +60,7 @@ import 
org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateUtils;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,15 +92,15 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
  * {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable} have only 
data
  * readers, -writers, and certain event callbacks. The task connects those to 
the
  * network stack and actor messages, and tracks the state of the execution and
- * handles exceptions.</p>
+ * handles exceptions.
  *
  * <p>Tasks have no knowledge about how they relate to other tasks, or whether 
they
  * are the first attempt to execute the task, or a repeated attempt. All of 
that
  * is only known to the JobManager. All the task knows are its own runnable 
code,
  * the task's configuration, and the IDs of the intermediate results to 
consume and
- * produce (if any).</p>
+ * produce (if any).
  *
- * <p>Each Task is run by one dedicated thread.</p>
+ * <p>Each Task is run by one dedicated thread.
  */
 public class Task implements Runnable {
 
@@ -129,6 +130,7 @@ public class Task implements Runnable {
        /** TaskInfo object for this task */
        private final TaskInfo taskInfo;
 
+       /** The name of the task, including subtask indexes */
        private final String taskNameWithSubtask;
 
        /** The job-wide configuration object */
@@ -158,6 +160,9 @@ public class Task implements Runnable {
        /** The BroadcastVariableManager to be used by this task */
        private final BroadcastVariableManager broadcastVariableManager;
 
+       /** Serialized version of the job specific execution configuration (see 
{@link ExecutionConfig}). */
+       private final SerializedValue<ExecutionConfig> 
serializedExecutionConfig;
+
        private final ResultPartition[] producedPartitions;
 
        private final ResultPartitionWriter[] writers;
@@ -193,6 +198,9 @@ public class Task implements Runnable {
        /** The thread that executes the task */
        private final Thread executingThread;
 
+       /** Parent group for all metrics of this task */
+       private final TaskMetricGroup metrics;
+
        // 
------------------------------------------------------------------------
        //  Fields that control the task execution. All these fields are 
volatile
        //  (which means that they introduce memory barriers), to establish
@@ -202,8 +210,6 @@ public class Task implements Runnable {
        /** atomic flag that makes sure the invokable is canceled exactly once 
upon error */
        private final AtomicBoolean invokableHasBeenCanceled;
 
-       private final TaskMetricGroup metrics;
-
        /** The invokable of this task, if initialized */
        private volatile AbstractInvokable invokable;
 
@@ -215,16 +221,13 @@ public class Task implements Runnable {
 
        /** Serial executor for asynchronous calls (checkpoints, etc), lazily 
initialized */
        private volatile ExecutorService asyncCallDispatcher;
-       
+
        /** The handle to the state that the operator was initialized with. 
Will be set to null after the
         * initialization, to be memory friendly */
        private volatile SerializedValue<StateHandle<?>> operatorState;
 
        private volatile long recoveryTs;
 
-       /** Serialized version of the job specific execution configuration (see 
{@link ExecutionConfig}). */
-       private final SerializedValue<ExecutionConfig> 
serializedExecutionConfig;
-
        /** Initialized from the Flink configuration. May also be set at the 
ExecutionConfig */
        private long taskCancellationInterval;
 
@@ -688,9 +691,6 @@ public class Task implements Runnable {
 
                                // remove all of the tasks library resources
                                libraryCache.unregisterTask(jobId, executionId);
-                               
-                               //Uncomment before Merging!!!
-                               //metrics.close();
 
                                // remove all files in the distributed cache
                                removeCachedFiles(distributedCacheEntries, 
fileCache);
@@ -703,6 +703,16 @@ public class Task implements Runnable {
                                LOG.error(message, t);
                                notifyFatalError(message, t);
                        }
+                       
+                       // un-register the metrics at the end so that the task 
may already be
+                       // counted as finished when this happens
+                       // errors here will only be logged
+                       try {
+                               metrics.close();
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Error during metrics 
de-registration", t);
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index beb012c..a5cc18d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -155,8 +155,8 @@ class TaskManager(
   /** Registry of metrics periodically transmitted to the JobManager */
   private val metricRegistry = TaskManager.createMetricsRegistry()
 
-  private var metricsRegistry : FlinkMetricRegistry = null
-  private var taskManagerMetricGroup : TaskManagerMetricGroup = null
+  private var metricsRegistry : FlinkMetricRegistry = _
+  private var taskManagerMetricGroup : TaskManagerMetricGroup = _
 
   /** Metric serialization */
   private val metricRegistryMapper: ObjectMapper = new ObjectMapper()
@@ -938,7 +938,7 @@ class TaskManager(
       libraryCacheManager = Some(new FallbackLibraryCacheManager)
     }
 
-    metricsRegistry = new FlinkMetricRegistry(this.config.configuration);
+    metricsRegistry = new FlinkMetricRegistry(config.configuration)
     
     taskManagerMetricGroup = 
       new TaskManagerMetricGroup(metricsRegistry, 
this.runtimeInfo.getHostname, id.toString)
@@ -1011,6 +1011,10 @@ class TaskManager(
 
     // disassociate the network environment
     network.disassociate()
+    
+    // stop the metrics reporters
+    metricsRegistry.shutdown()
+    metricsRegistry = null
   }
 
   protected def handleJobManagerDisconnect(jobManager: ActorRef, msg: String): 
Unit = {
@@ -1085,8 +1089,9 @@ class TaskManager(
       }
       
       val taskMetricGroup = taskManagerMetricGroup
-          .addJob(tdd.getJobID, jobName)
-          .addTask(tdd.getVertexID, tdd.getExecutionId, 
tdd.getIndexInSubtaskGroup, tdd.getTaskName)
+          .addTaskForJob(
+            tdd.getJobID, jobName,
+            tdd.getVertexID, tdd.getExecutionId, tdd.getIndexInSubtaskGroup, 
tdd.getTaskName)
 
       val task = new Task(
         tdd,
@@ -1224,16 +1229,16 @@ class TaskManager(
         registry.getSnapshot
       }
 
-        self ! decorateMessage(
-          UpdateTaskExecutionState(
-            new TaskExecutionState(
-              task.getJobID,
-              task.getExecutionId,
-              task.getExecutionState,
-              task.getFailureCause,
-              accumulators)
-          )
+      self ! decorateMessage(
+        UpdateTaskExecutionState(
+          new TaskExecutionState(
+            task.getJobID,
+            task.getExecutionId,
+            task.getExecutionState,
+            task.getFailureCause,
+            accumulators)
         )
+      )
     }
     else {
       log.error(s"Cannot find task with ID $executionID to unregister.")

http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c97815a..784aa40 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,7 +74,7 @@ under the License.
                <module>flink-quickstart</module>
                <module>flink-contrib</module>
                <module>flink-dist</module>
-               <module>flink-metrics</module>
+               <module>flink-metric-reporters</module>
        </modules>
 
        <properties>

Reply via email to