http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskMetricGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskMetricGroupTest.java
deleted file mode 100644
index f62f51f..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskMetricGroupTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.scope.ScopeFormat;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
-import org.apache.flink.util.AbstractID;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TaskMetricGroupTest {
-
-       // 
------------------------------------------------------------------------
-       //  scope tests
-       // 
------------------------------------------------------------------------
-       private CountingMetricRegistry registry;
-
-       @Before
-       public void createRegistry() {
-               this.registry = new CountingMetricRegistry(new Configuration());
-       }
-
-       @After
-       public void shutdownRegistry() {
-               this.registry.shutdown();
-               this.registry = null;
-       }
-
-       @Test
-       public void testGenerateScopeDefault() {
-               AbstractID vertexId = new AbstractID();
-               AbstractID executionId = new AbstractID();
-
-               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-               TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
-               TaskMetricGroup taskGroup = new TaskMetricGroup(registry, 
jmGroup, vertexId, executionId, "aTaskName", 13, 2);
-
-               assertArrayEquals(
-                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName", "aTaskName", "13"},
-                               taskGroup.getScopeComponents());
-
-               assertEquals(
-                               
"theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13.name",
-                               taskGroup.getMetricIdentifier("name"));
-               registry.shutdown();
-       }
-
-       @Test
-       public void testGenerateScopeCustom() {
-               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("abc");
-               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("def", tmFormat);
-               TaskScopeFormat taskFormat = new 
TaskScopeFormat("<tm_id>.<job_id>.<task_id>.<task_attempt_id>", jmFormat);
-
-               JobID jid = new JobID();
-               AbstractID vertexId = new AbstractID();
-               AbstractID executionId = new AbstractID();
-
-               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-               TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
-               TaskMetricGroup taskGroup = new TaskMetricGroup(
-                               registry, jmGroup, taskFormat, vertexId, 
executionId, "aTaskName", 13, 2);
-
-               assertArrayEquals(
-                               new String[] { "test-tm-id", jid.toString(), 
vertexId.toString(), executionId.toString() },
-                               taskGroup.getScopeComponents());
-
-               assertEquals(
-                               String.format("test-tm-id.%s.%s.%s.name", jid, 
vertexId, executionId),
-                               taskGroup.getMetricIdentifier("name"));
-               registry.shutdown();
-       }
-
-       @Test
-       public void testGenerateScopeWilcard() {
-               TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat(
-                               ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
-               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat(
-                               
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, tmFormat);
-
-               TaskScopeFormat format = new 
TaskScopeFormat("*.<task_attempt_id>.<subtask_index>", jmFormat);
-
-               AbstractID executionId = new AbstractID();
-
-               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-               TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
-
-               TaskMetricGroup taskGroup = new TaskMetricGroup(
-                               registry, jmGroup, format, new AbstractID(), 
executionId, "aTaskName", 13, 1);
-
-               assertArrayEquals(
-                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName", executionId.toString(), "13" },
-                               taskGroup.getScopeComponents());
-
-               assertEquals(
-                               "theHostName.taskmanager.test-tm-id.myJobName." 
+ executionId + ".13.name",
-                               taskGroup.getMetricIdentifier("name"));
-               registry.shutdown();
-       }
-
-       @Test
-       public void testTaskMetricGroupCleanup() {
-               TaskManagerMetricGroup taskManagerMetricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "0");
-               TaskManagerJobMetricGroup taskManagerJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job");
-               TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, 
taskManagerJobMetricGroup, new AbstractID(), new AbstractID(), "task", 0, 0);
-
-               // the io metric should have registered predefined metrics
-               assertTrue(registry.getNumberRegisteredMetrics() > 0);
-
-               taskMetricGroup.close();
-
-               // now alle registered metrics should have been unregistered
-               assertEquals(0, registry.getNumberRegisteredMetrics());
-       }
-
-       private static class CountingMetricRegistry extends MetricRegistry {
-
-               private int counter = 0;
-
-               CountingMetricRegistry(Configuration config) {
-                       super(config);
-               }
-
-               @Override
-               public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
-                       super.register(metric, metricName, group);
-                       counter++;
-               }
-
-               @Override
-               public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
-                       super.unregister(metric, metricName, group);
-                       counter--;
-               }
-
-               int getNumberRegisteredMetrics() {
-                       return counter;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
deleted file mode 100644
index 06bcdf3..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.reporter;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.HistogramStatistics;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.metrics.util.TestReporter;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServer;
-import javax.management.MBeanServerConnection;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
-import java.lang.management.ManagementFactory;
-
-import static org.junit.Assert.assertEquals;
-
-public class JMXReporterTest extends TestLogger {
-
-       @Test
-       public void testReplaceInvalidChars() {
-               assertEquals("", JMXReporter.replaceInvalidChars(""));
-               assertEquals("abc", JMXReporter.replaceInvalidChars("abc"));
-               assertEquals("abc", JMXReporter.replaceInvalidChars("abc\""));
-               assertEquals("abc", JMXReporter.replaceInvalidChars("\"abc"));
-               assertEquals("abc", JMXReporter.replaceInvalidChars("\"abc\""));
-               assertEquals("abc", 
JMXReporter.replaceInvalidChars("\"a\"b\"c\""));
-               assertEquals("", JMXReporter.replaceInvalidChars("\"\"\"\""));
-               assertEquals("____", JMXReporter.replaceInvalidChars("    "));
-               assertEquals("ab_-(c)-", JMXReporter.replaceInvalidChars("\"ab 
;(c)'"));
-               assertEquals("a_b_c", JMXReporter.replaceInvalidChars("a b c"));
-               assertEquals("a_b_c_", JMXReporter.replaceInvalidChars("a b c 
"));
-               assertEquals("a-b-c-", 
JMXReporter.replaceInvalidChars("a;b'c*"));
-               assertEquals("a------b------c", 
JMXReporter.replaceInvalidChars("a,=;:?'b,=;:?'c"));
-       }
-
-       /**
-        * Verifies that the JMXReporter properly generates the JMX name.
-        */
-       @Test
-       public void testGenerateName() {
-               String[] scope = {"value0", "value1", "\"value2 (test),=;:?'"};
-               String jmxName = JMXReporter.generateJmxName("TestMetric", 
scope);
-
-               
assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=TestMetric",
 jmxName);
-       }
-
-       /**
-        * Verifies that multiple JMXReporters can be started on the same 
machine and register metrics at the MBeanServer.
-        *
-        * @throws Exception if the attribute/mbean could not be found or the 
test is broken
-        */
-       @Test
-       public void testPortConflictHandling() throws Exception {
-               Configuration cfg = new Configuration();
-               cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter.class.getName());
-               MetricRegistry reg = new MetricRegistry(cfg);
-
-               TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, 
"host", "tm");
-
-               JMXReporter rep1 = new JMXReporter();
-               JMXReporter rep2 = new JMXReporter();
-
-               Configuration cfg1 = new Configuration();
-               cfg1.setString("port", "9020-9035");
-
-               rep1.open(cfg1);
-               rep2.open(cfg1);
-
-               rep1.notifyOfAddedMetric(new Gauge<Integer>() {
-                       @Override
-                       public Integer getValue() {
-                               return 1;
-                       }
-               }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
-
-               rep2.notifyOfAddedMetric(new Gauge<Integer>() {
-                       @Override
-                       public Integer getValue() {
-                               return 2;
-                       }
-               }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
-
-               MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
-
-               ObjectName objectName1 = new 
ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
-               ObjectName objectName2 = new 
ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
-
-               assertEquals(1, mBeanServer.getAttribute(objectName1, "Value"));
-               assertEquals(2, mBeanServer.getAttribute(objectName2, "Value"));
-
-               rep1.close();
-               rep2.close();
-               reg.shutdown();
-       }
-
-       /**
-        * Verifies that we can connect to multiple JMXReporters running on the 
same machine.
-        *
-        * @throws Exception
-        */
-       @Test
-       public void testJMXAvailability() throws Exception {
-               Configuration cfg = new Configuration();
-               cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter.class.getName());
-               MetricRegistry reg = new MetricRegistry(cfg);
-
-               TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, 
"host", "tm");
-
-               JMXReporter rep1 = new JMXReporter();
-               JMXReporter rep2 = new JMXReporter();
-
-               Configuration cfg1 = new Configuration();
-               cfg1.setString("port", "9040-9055");
-               rep1.open(cfg1);
-               rep2.open(cfg1);
-
-               rep1.notifyOfAddedMetric(new Gauge<Integer>() {
-                       @Override
-                       public Integer getValue() {
-                               return 1;
-                       }
-               }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
-
-               rep2.notifyOfAddedMetric(new Gauge<Integer>() {
-                       @Override
-                       public Integer getValue() {
-                               return 2;
-                       }
-               }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
-
-               ObjectName objectName1 = new 
ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
-               ObjectName objectName2 = new 
ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
-
-               JMXServiceURL url1 = new 
JMXServiceURL("service:jmx:rmi://localhost:" + rep1.getPort() + 
"/jndi/rmi://localhost:" + rep1.getPort() + "/jmxrmi");
-               JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
-               MBeanServerConnection mCon1 = 
jmxCon1.getMBeanServerConnection();
-
-               assertEquals(1, mCon1.getAttribute(objectName1, "Value"));
-               assertEquals(2, mCon1.getAttribute(objectName2, "Value"));
-
-               url1 = null;
-               jmxCon1.close();
-               jmxCon1 = null;
-               mCon1 = null;
-
-               JMXServiceURL url2 = new 
JMXServiceURL("service:jmx:rmi://localhost:" + rep2.getPort() + 
"/jndi/rmi://localhost:" + rep2.getPort() + "/jmxrmi");
-               JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
-               MBeanServerConnection mCon2 = 
jmxCon2.getMBeanServerConnection();
-
-               assertEquals(1, mCon2.getAttribute(objectName1, "Value"));
-               assertEquals(2, mCon2.getAttribute(objectName2, "Value"));
-
-               url2 = null;
-               jmxCon2.close();
-               jmxCon2 = null;
-               mCon2 = null;
-
-               rep1.close();
-               rep2.close();
-               reg.shutdown();
-       }
-
-       /**
-        * Tests that histograms are properly reported via the JMXReporter.
-        */
-       @Test
-       public void testHistogramReporting() throws Exception {
-               MetricRegistry registry = null;
-               String histogramName = "histogram";
-
-               try {
-                       Configuration config = new Configuration();
-                       
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
"org.apache.flink.metrics.reporter.JMXReporter");
-
-                       registry = new MetricRegistry(config);
-
-                       TaskManagerMetricGroup metricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "tmId");
-
-                       TestingHistogram histogram = new TestingHistogram();
-
-                       metricGroup.histogram(histogramName, histogram);
-
-                       MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
-
-                       ObjectName objectName = new 
ObjectName(JMXReporter.generateJmxName(histogramName, 
metricGroup.getScopeComponents()));
-
-                       MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
-
-                       MBeanAttributeInfo[] attributeInfos = 
info.getAttributes();
-
-                       assertEquals(11, attributeInfos.length);
-
-                       assertEquals(histogram.getCount(), 
mBeanServer.getAttribute(objectName, "Count"));
-                       assertEquals(histogram.getStatistics().getMean(), 
mBeanServer.getAttribute(objectName, "Mean"));
-                       assertEquals(histogram.getStatistics().getStdDev(), 
mBeanServer.getAttribute(objectName, "StdDev"));
-                       assertEquals(histogram.getStatistics().getMax(), 
mBeanServer.getAttribute(objectName, "Max"));
-                       assertEquals(histogram.getStatistics().getMin(), 
mBeanServer.getAttribute(objectName, "Min"));
-                       
assertEquals(histogram.getStatistics().getQuantile(0.5), 
mBeanServer.getAttribute(objectName, "Median"));
-                       
assertEquals(histogram.getStatistics().getQuantile(0.75), 
mBeanServer.getAttribute(objectName, "75thPercentile"));
-                       
assertEquals(histogram.getStatistics().getQuantile(0.95), 
mBeanServer.getAttribute(objectName, "95thPercentile"));
-                       
assertEquals(histogram.getStatistics().getQuantile(0.98), 
mBeanServer.getAttribute(objectName, "98thPercentile"));
-                       
assertEquals(histogram.getStatistics().getQuantile(0.99), 
mBeanServer.getAttribute(objectName, "99thPercentile"));
-                       
assertEquals(histogram.getStatistics().getQuantile(0.999), 
mBeanServer.getAttribute(objectName, "999thPercentile"));
-
-               } finally {
-                       if (registry != null) {
-                               registry.shutdown();
-                       }
-               }
-       }
-
-       static class TestingHistogram implements Histogram {
-
-               @Override
-               public void update(long value) {
-
-               }
-
-               @Override
-               public long getCount() {
-                       return 1;
-               }
-
-               @Override
-               public HistogramStatistics getStatistics() {
-                       return new HistogramStatistics() {
-                               @Override
-                               public double getQuantile(double quantile) {
-                                       return quantile;
-                               }
-
-                               @Override
-                               public long[] getValues() {
-                                       return new long[0];
-                               }
-
-                               @Override
-                               public int size() {
-                                       return 3;
-                               }
-
-                               @Override
-                               public double getMean() {
-                                       return 4;
-                               }
-
-                               @Override
-                               public double getStdDev() {
-                                       return 5;
-                               }
-
-                               @Override
-                               public long getMax() {
-                                       return 6;
-                               }
-
-                               @Override
-                               public long getMin() {
-                                       return 7;
-                               }
-                       };
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java 
b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
deleted file mode 100644
index 3f8909d..0000000
--- a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.metrics.reporter.AbstractReporter;
-
-public class TestReporter extends AbstractReporter {
-
-       @Override
-       public void open(Configuration config) {}
-
-       @Override
-       public void close() {}
-
-       @Override
-       public void notifyOfAddedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {}
-
-       @Override
-       public void notifyOfRemovedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {}
-
-       @Override
-       public String filterCharacters(String input) {
-               return input;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 2bc26a7..1a0e8d8 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -108,6 +108,13 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-jmx</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               
+
        </dependencies>
 
        <!-- See main pom.xml for explanation of profiles -->

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/pom.xml 
b/flink-metrics/flink-metrics-core/pom.xml
new file mode 100644
index 0000000..ab5eb56
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-metrics</artifactId>
+               <version>1.1-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-metrics-core</artifactId>
+       <name>flink-metrics-core</name>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
new file mode 100644
index 0000000..1e9fbc4
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+/**
+ * Interface for a character filter function. The filter function is given a 
string which the filter
+ * can transform. The returned string is the transformation result.
+ */
+public interface CharacterFilter {
+
+       /**
+        * Filter the given string and generate a resulting string from it.
+        *
+        * For example, one implementation could filter out invalid characters 
from the input string.
+        *
+        * @param input Input string
+        * @return Filtered result string
+        */
+       String filterCharacters(String input);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Counter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Counter.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Counter.java
new file mode 100644
index 0000000..7f63cf9
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Counter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+/**
+ * A Counter is a {@link Metric} that measures a count.
+ */
+public interface Counter extends Metric {
+
+       /**
+        * Increment the current count by 1.
+        */
+       void inc();
+
+       /**
+        * Increment the current count by the given value.
+        *
+        * @param n value to increment the current count by
+        */
+       void inc(long n);
+
+       /**
+        * Decrement the current count by 1.
+        */
+       void dec();
+
+       /**
+        * Decrement the current count by the given value.
+        *
+        * @param n value to decrement the current count by
+        */
+       void dec(long n);
+
+       /**
+        * Returns the current count.
+        *
+        * @return current count
+        */
+       long getCount();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Gauge.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Gauge.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Gauge.java
new file mode 100644
index 0000000..eb7c40f
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Gauge.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+/**
+ * A Gauge is a {@link Metric} that calculates a specific value at a point in 
time.
+ */
+public interface Gauge<T> extends Metric {
+
+       /**
+        * Calculates and returns the measured value.
+        *
+        * @return calculated value
+        */
+       T getValue();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
new file mode 100644
index 0000000..af5c9b0
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+/**
+ * Histogram interface to be used with Flink's metrics system.
+ *
+ * The histogram allows to record values, get the current count of recorded 
values and create
+ * histogram statistics for the currently seen elements.
+ */
+public interface Histogram extends Metric {
+
+       /**
+        * Update the histogram with the given value.
+        *
+        * @param value Value to update the histogram with
+        */
+       void update(long value);
+
+       /**
+        * Get the count of seen elements.
+        *
+        * @return Count of seen elements
+        */
+       long getCount();
+
+       /**
+        * Create statistics for the currently recorded elements.
+        *
+        * @return Statistics about the currently recorded elements
+        */
+       HistogramStatistics getStatistics();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
new file mode 100644
index 0000000..b2e4507
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+/**
+ * Histogram statistics represent the current snapshot of elements recorded in 
the histogram.
+ *
+ * The histogram statistics allow to calculate values for quantiles, the mean, 
the standard
+ * deviation, the minimum and the maximum.
+ */
+public abstract class HistogramStatistics {
+
+       /**
+        * Returns the value for the given quantile based on the represented 
histogram statistics.
+        *
+        * @param quantile Quantile to calculate the value for
+        * @return Value for the given quantile
+        */
+       public abstract double getQuantile(double quantile);
+
+       /**
+        * Returns the elements of the statistics' sample
+        *
+        * @return Elements of the statistics' sample
+        */
+       public abstract long[] getValues();
+
+       /**
+        * Returns the size of the statistics' sample
+        *
+        * @return Size of the statistics' sample
+        */
+       public abstract int size();
+
+       /**
+        * Returns the mean of the histogram values.
+        *
+        * @return Mean of the histogram values
+        */
+       public abstract double getMean();
+
+       /**
+        * Returns the standard deviation of the distribution reflected by the 
histogram statistics.
+        *
+        * @return Standard deviation of histogram distribution
+        */
+       public abstract double getStdDev();
+
+       /**
+        * Returns the maximum value of the histogram.
+        *
+        * @return Maximum value of the histogram
+        */
+       public abstract long getMax();
+
+       /**
+        * Returns the minimum value of the histogram.
+        *
+        * @return Minimum value of the histogram
+        */
+       public abstract long getMin();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java
new file mode 100644
index 0000000..5597d91
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+/**
+ * Common super interface for all metrics.
+ */
+public interface Metric {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
new file mode 100644
index 0000000..a611ae8
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+import java.util.Properties;
+
+public class MetricConfig extends Properties {
+
+       public String getString(String key, String defaultValue) {
+               return getProperty(key, defaultValue);
+       }
+
+       public int getInteger(String key, int defaultValue) {
+               String argument = getProperty(key, null);
+               return argument == null
+                       ? defaultValue
+                       : Integer.parseInt(argument);
+       }
+
+       public long getLong(String key, long defaultValue) {
+               String argument = getProperty(key, null);
+               return argument == null
+                       ? defaultValue
+                       : Long.parseLong(argument);
+       }
+
+       public float getFloat(String key, float defaultValue) {
+               String argument = getProperty(key, null);
+               return argument == null
+                       ? defaultValue
+                       : Float.parseFloat(argument);
+       }
+
+       public double getDouble(String key, double defaultValue) {
+               String argument = getProperty(key, null);
+               return argument == null
+                       ? defaultValue
+                       : Double.parseDouble(argument);
+       }
+
+       public boolean getBoolean(String key, boolean defaultValue) {
+               String argument = getProperty(key, null);
+               return argument == null
+                       ? defaultValue
+                       : Boolean.parseBoolean(argument);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
new file mode 100644
index 0000000..15f25b9
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+/**
+ * A MetricGroup is a named container for {@link Metric Metrics} and further 
metric subgroups.
+ * 
+ * <p>Instances of this class can be used to register new metrics with Flink 
and to create a nested
+ * hierarchy based on the group names.
+ * 
+ * <p>A MetricGroup is uniquely identified by it's place in the hierarchy and 
name.
+ */
+public interface MetricGroup {
+
+       // 
------------------------------------------------------------------------
+       //  Metrics
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates and registers a new {@link org.apache.flink.metrics.Counter} 
with Flink.
+        *
+        * @param name name of the counter
+        * @return the created counter
+        */
+       Counter counter(int name);
+
+       /**
+        * Creates and registers a new {@link org.apache.flink.metrics.Counter} 
with Flink.
+        *
+        * @param name name of the counter
+        * @return the created counter
+        */
+       Counter counter(String name);
+
+       /**
+        * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
+        *
+        * @param name    name of the counter
+        * @param counter counter to register
+        * @param <C>     counter type
+        * @return the given counter
+        */
+       <C extends Counter> C counter(int name, C counter);
+
+       /**
+        * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
+        *
+        * @param name    name of the counter
+        * @param counter counter to register
+        * @param <C>     counter type
+        * @return the given counter
+        */
+       <C extends Counter> C counter(String name, C counter);
+       
+       /**
+        * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
+        *
+        * @param name  name of the gauge
+        * @param gauge gauge to register
+        * @param <T>   return type of the gauge
+        * @return the given gauge
+        */
+       <T, G extends Gauge<T>> G gauge(int name, G gauge);
+
+       /**
+        * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
+        *
+        * @param name  name of the gauge
+        * @param gauge gauge to register
+        * @param <T>   return type of the gauge
+        * @return the given gauge
+        */
+       <T, G extends Gauge<T>> G gauge(String name, G gauge);
+
+       /**
+        * Registers a new {@link Histogram} with Flink.
+        *
+        * @param name name of the histogram
+        * @param histogram histogram to register
+        * @param <H> histogram type   
+        * @return the registered histogram
+        */
+       <H extends Histogram> H histogram(String name, H histogram);
+
+       /**
+        * Registers a new {@link Histogram} with Flink.
+        *
+        * @param name name of the histogram
+        * @param histogram histogram to register
+        * @param <H> histogram type   
+        * @return the registered histogram
+        */
+       <H extends Histogram> H histogram(int name, H histogram);
+
+       // 
------------------------------------------------------------------------
+       // Groups
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new MetricGroup and adds it to this groups sub-groups.
+        *
+        * @param name name of the group
+        * @return the created group
+        */
+       MetricGroup addGroup(int name);
+
+       /**
+        * Creates a new MetricGroup and adds it to this groups sub-groups.
+        *
+        * @param name name of the group
+        * @return the created group
+        */
+       MetricGroup addGroup(String name);
+
+       // 
------------------------------------------------------------------------
+       // Scope
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the scope as an array of the scope components, for example
+        * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
+        *
+        * @see #getMetricIdentifier(String)
+        * @see #getMetricIdentifier(String, CharacterFilter)
+        */
+       String[] getScopeComponents();
+
+       /**
+        * Returns the fully qualified metric name, for example
+        * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+        *
+        * @param metricName metric name
+        * @return fully qualified metric name
+        */
+       String getMetricIdentifier(String metricName);
+
+       /**
+        * Returns the fully qualified metric name, for example
+        * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+        *
+        * @param metricName metric name
+        * @param filter character filter which is applied to the scope 
components if not null.
+        * @return fully qualified metric name
+        */
+       String getMetricIdentifier(String metricName, CharacterFilter filter);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
new file mode 100644
index 0000000..6ec3b28
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+/**
+ * A simple low-overhead {@link org.apache.flink.metrics.Counter} that is not 
thread-safe.
+ */
+public class SimpleCounter implements Counter {
+
+       /** the current count */
+       private long count;
+
+       /**
+        * Increment the current count by 1.
+        */
+       @Override
+       public void inc() {
+               count++;
+       }
+
+       /**
+        * Increment the current count by the given value.
+        *
+        * @param n value to increment the current count by
+        */
+       @Override
+       public void inc(long n) {
+               count += n;
+       }
+
+       /**
+        * Decrement the current count by 1.
+        */
+       @Override
+       public void dec() {
+               count--;
+       }
+
+       /**
+        * Decrement the current count by the given value.
+        *
+        * @param n value to decrement the current count by
+        */
+       @Override
+       public void dec(long n) {
+               count -= n;
+       }
+
+       /**
+        * Returns the current count.
+        *
+        * @return current count
+        */
+       @Override
+       public long getCount() {
+               return count;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
new file mode 100644
index 0000000..1c7f0ce
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+
+/**
+ * A special {@link MetricGroup} that does not register any metrics at the 
metrics registry
+ * and any reporters.
+ */
+public class UnregisteredMetricsGroup implements MetricGroup {
+
+       @Override
+       public Counter counter(int name) {
+               return new SimpleCounter();
+       }
+
+       @Override
+       public Counter counter(String name) {
+               return new SimpleCounter();
+       }
+
+       @Override
+       public <C extends Counter> C counter(int name, C counter) {
+               return counter;
+       }
+
+       @Override
+       public <C extends Counter> C counter(String name, C counter) {
+               return counter;
+       }
+
+       @Override
+       public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
+               return gauge;
+       }
+
+       @Override
+       public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
+               return gauge;
+       }
+
+       @Override
+       public <H extends Histogram> H histogram(int name, H histogram) {
+               return histogram;
+       }
+
+       @Override
+       public <H extends Histogram> H histogram(String name, H histogram) {
+               return histogram;
+       }
+
+       @Override
+       public MetricGroup addGroup(int name) {
+               return addGroup(String.valueOf(name));
+       }
+
+       @Override
+       public MetricGroup addGroup(String name) {
+               return new UnregisteredMetricsGroup();
+       }
+
+       @Override
+       public String[] getScopeComponents() {
+               return new String[0];
+       }
+
+       @Override
+       public String getMetricIdentifier(String metricName) {
+               return metricName;
+       }
+
+       @Override
+       public String getMetricIdentifier(String metricName, CharacterFilter 
filter) {
+               return metricName;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
new file mode 100644
index 0000000..7ab8c73
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.reporter;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base interface for custom metric reporters.
+ */
+public abstract class AbstractReporter implements MetricReporter, 
CharacterFilter {
+       protected final Logger log = LoggerFactory.getLogger(getClass());
+
+       protected final Map<Gauge<?>, String> gauges = new HashMap<>();
+       protected final Map<Counter, String> counters = new HashMap<>();
+       protected final Map<Histogram, String> histograms = new HashMap<>();
+
+       @Override
+       public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+               final String name = group.getMetricIdentifier(metricName, this);
+
+               synchronized (this) {
+                       if (metric instanceof Counter) {
+                               counters.put((Counter) metric, name);
+                       } else if (metric instanceof Gauge) {
+                               gauges.put((Gauge<?>) metric, name);
+                       } else if (metric instanceof Histogram) {
+                               histograms.put((Histogram) metric, name);
+                       } else {
+                               log.warn("Cannot add unknown metric type {}. 
This indicates that the reporter " +
+                                       "does not support this metric type.", 
metric.getClass().getName());
+                       }
+               }
+       }
+
+       @Override
+       public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
+               synchronized (this) {
+                       if (metric instanceof Counter) {
+                               counters.remove(metric);
+                       } else if (metric instanceof Gauge) {
+                               gauges.remove(metric);
+                       } else if (metric instanceof Histogram) {
+                               histograms.remove(metric);
+                       } else {
+                               log.warn("Cannot remove unknown metric type {}. 
This indicates that the reporter " +
+                                       "does not support this metric type.", 
metric.getClass().getName());
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
new file mode 100644
index 0000000..bd13be7
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.reporter;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * Reporters are used to export {@link Metric Metrics} to an external backend.
+ * 
+ * <p>Reporters are instantiated via reflection and must be public, 
non-abstract, and have a
+ * public no-argument constructor.
+ */
+public interface MetricReporter {
+
+       // 
------------------------------------------------------------------------
+       //  life cycle
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Configures this reporter. Since reporters are instantiated 
generically and hence parameter-less,
+        * this method is the place where the reporters set their basic fields 
based on configuration values.
+        * 
+        * <p>This method is always called first on a newly instantiated 
reporter.
+        *
+        * @param config The configuration with all parameters.
+        */
+       void open(MetricConfig config);
+
+       /**
+        * Closes this reporter. Should be used to close channels, streams and 
release resources.
+        */
+       void close();
+
+       // 
------------------------------------------------------------------------
+       //  adding / removing metrics
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Called when a new {@link Metric} was added.
+        *
+        * @param metric      the metric that was added
+        * @param metricName  the name of the metric
+        * @param group       the group that contains the metric
+        */
+       void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup 
group);
+
+       /**
+        * Called when a {@link Metric} was should be removed.
+        *
+        * @param metric      the metric that should be removed
+        * @param metricName  the name of the metric
+        * @param group       the group that contains the metric
+        */
+       void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
new file mode 100644
index 0000000..f82f5a6
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.reporter;
+
+/**
+ * Interface for reporters that actively send out data periodically.
+ */
+public interface Scheduled {
+
+       /**
+        * Report the current measurements. This method is called periodically 
by the
+        * metrics registry that uses the reporter.
+        */
+       void report();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/pom.xml 
b/flink-metrics/flink-metrics-dropwizard/pom.xml
index 3d3ecb7..e63af7d 100644
--- a/flink-metrics/flink-metrics-dropwizard/pom.xml
+++ b/flink-metrics/flink-metrics-dropwizard/pom.xml
@@ -35,7 +35,14 @@ under the License.
        <dependencies>
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
+                       <artifactId>flink-annotations</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-core</artifactId>
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
@@ -51,6 +58,13 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-test-utils-junit</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
index 568bda2..ce0299b 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
@@ -23,7 +23,6 @@ import com.codahale.metrics.Reporter;
 import com.codahale.metrics.ScheduledReporter;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper;
 import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
 import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper;
@@ -33,7 +32,8 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
 import org.slf4j.Logger;
@@ -88,7 +88,7 @@ public abstract class ScheduledDropwizardReporter implements 
MetricReporter, Sch
        // 
------------------------------------------------------------------------
 
        @Override
-       public void open(Configuration config) {
+       public void open(MetricConfig config) {
                this.reporter = getReporter(config);
        }
 
@@ -102,7 +102,7 @@ public abstract class ScheduledDropwizardReporter 
implements MetricReporter, Sch
        // 
------------------------------------------------------------------------
 
        @Override
-       public void notifyOfAddedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
+       public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
                final String fullName = group.getMetricIdentifier(metricName, 
this);
 
                synchronized (this) {
@@ -130,7 +130,7 @@ public abstract class ScheduledDropwizardReporter 
implements MetricReporter, Sch
        }
 
        @Override
-       public void notifyOfRemovedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
+       public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
                synchronized (this) {
                        String fullName;
                        
@@ -200,5 +200,5 @@ public abstract class ScheduledDropwizardReporter 
implements MetricReporter, Sch
                this.reporter.report(gauges, counters, histograms, meters, 
timers);
        }
 
-       public abstract ScheduledReporter getReporter(Configuration config);
+       public abstract ScheduledReporter getReporter(MetricConfig config);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 4b19b50..9b04c3b 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -23,17 +23,18 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
-import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.util.AbstractID;
+
 import org.junit.Test;
 
 import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -42,10 +43,10 @@ import static org.junit.Assert.assertTrue;
 public class ScheduledDropwizardReporterTest {
 
        @Test
-       public void testInvalidCharacterReplacement() throws 
NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+       public void testInvalidCharacterReplacement() {
                ScheduledDropwizardReporter reporter = new 
ScheduledDropwizardReporter() {
                        @Override
-                       public ScheduledReporter getReporter(Configuration 
config) {
+                       public ScheduledReporter getReporter(MetricConfig 
config) {
                                return null;
                        }
                };
@@ -112,7 +113,7 @@ public class ScheduledDropwizardReporterTest {
        public static class TestingScheduledDropwizardReporter extends 
ScheduledDropwizardReporter {
 
                @Override
-               public ScheduledReporter getReporter(Configuration config) {
+               public ScheduledReporter getReporter(MetricConfig config) {
                        return null;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index c1913d7..7b27867 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -29,9 +29,10 @@ import com.codahale.metrics.Timer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -153,7 +154,7 @@ public class DropwizardFlinkHistogramWrapperTest extends 
TestLogger {
                TestingScheduledReporter scheduledReporter = null;
 
                @Override
-               public ScheduledReporter getReporter(Configuration config) {
+               public ScheduledReporter getReporter(MetricConfig config) {
                        scheduledReporter = new TestingScheduledReporter(
                                registry,
                                getClass().getName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-ganglia/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-ganglia/pom.xml 
b/flink-metrics/flink-metrics-ganglia/pom.xml
index e4993ad..1a94851 100644
--- a/flink-metrics/flink-metrics-ganglia/pom.xml
+++ b/flink-metrics/flink-metrics-ganglia/pom.xml
@@ -42,7 +42,7 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
+                       <artifactId>flink-metrics-core</artifactId>
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
 
b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
index f30c9f4..15176a3 100644
--- 
a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
+++ 
b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
@@ -23,8 +23,8 @@ import com.codahale.metrics.ScheduledReporter;
 import info.ganglia.gmetric4j.gmetric.GMetric;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+import org.apache.flink.metrics.MetricConfig;
 
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
@@ -38,7 +38,7 @@ public class GangliaReporter extends 
ScheduledDropwizardReporter {
        public static final String ARG_MODE_ADDRESSING = "addressingMode";
 
        @Override
-       public ScheduledReporter getReporter(Configuration config) {
+       public ScheduledReporter getReporter(MetricConfig config) {
 
                try {
                        String host = config.getString(ARG_HOST, null);

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-graphite/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/pom.xml 
b/flink-metrics/flink-metrics-graphite/pom.xml
index 44dc619..b75fbae 100644
--- a/flink-metrics/flink-metrics-graphite/pom.xml
+++ b/flink-metrics/flink-metrics-graphite/pom.xml
@@ -42,7 +42,7 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
+                       <artifactId>flink-metrics-core</artifactId>
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
 
b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
index 16be830..ca301aa 100644
--- 
a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
+++ 
b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
@@ -22,8 +22,8 @@ import com.codahale.metrics.ScheduledReporter;
 import com.codahale.metrics.graphite.Graphite;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+import org.apache.flink.metrics.MetricConfig;
 
 import java.util.concurrent.TimeUnit;
 
@@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
 public class GraphiteReporter extends ScheduledDropwizardReporter {
 
        @Override
-       public ScheduledReporter getReporter(Configuration config) {
+       public ScheduledReporter getReporter(MetricConfig config) {
                String host = config.getString(ARG_HOST, null);
                int port = config.getInteger(ARG_PORT, -1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-jmx/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/pom.xml 
b/flink-metrics/flink-metrics-jmx/pom.xml
new file mode 100644
index 0000000..45ffbf8
--- /dev/null
+++ b/flink-metrics/flink-metrics-jmx/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-metrics</artifactId>
+               <version>1.1-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-metrics-jmx</artifactId>
+       <name>flink-metrics-jmx</name>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-annotations</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
 
b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
new file mode 100644
index 0000000..1a283d9
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.jmx;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.MalformedURLException;
+import java.rmi.NoSuchObjectException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
+ *
+ * Largely based on the JmxReporter class of the dropwizard metrics library
+ * 
https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java
+ */
+public class JMXReporter implements MetricReporter {
+
+       private static final String PREFIX = "org.apache.flink.metrics:";
+       private static final String KEY_PREFIX = "key";
+
+       public static final String ARG_PORT = "port";
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(JMXReporter.class);
+
+       // 
------------------------------------------------------------------------
+
+       /** The server where the management beans are registered and 
deregistered */
+       private final MBeanServer mBeanServer;
+
+       /** The names under which the registered metrics have been added to the 
MBeanServer */ 
+       private final Map<Metric, ObjectName> registeredMetrics;
+
+       /** The server to which JMX clients connect to. ALlows for better 
control over port usage. */
+       private JMXServer jmxServer;
+
+       /**
+        * Creates a new JMXReporter
+        */
+       public JMXReporter() {
+               this.mBeanServer = ManagementFactory.getPlatformMBeanServer();
+               this.registeredMetrics = new HashMap<>();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  life cycle
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void open(MetricConfig config) {
+               String portsConfig = config.getString(ARG_PORT, null);
+
+               if (portsConfig != null) {
+                       Iterator<Integer> ports = 
NetUtils.getPortRangeFromString(portsConfig);
+
+                       JMXServer server = new JMXServer();
+                       while (ports.hasNext()) {
+                               int port = ports.next();
+                               try {
+                                       server.start(port);
+                                       LOG.info("Started JMX server on port " 
+ port + ".");
+                                       // only set our field if the server was 
actually started
+                                       jmxServer = server;
+                                       break;
+                               } catch (IOException ioe) { //assume port 
conflict
+                                       LOG.debug("Could not start JMX server 
on port " + port + ".", ioe);
+                                       try {
+                                               server.stop();
+                                       } catch (Exception e) {
+                                               LOG.debug("Could not stop JMX 
server.", e);
+                                       }
+                               }
+                       }
+                       if (jmxServer == null) {
+                               throw new RuntimeException("Could not start JMX 
server on any configured port. Ports: " + portsConfig);
+                       }
+               }
+       }
+
+       @Override
+       public void close() {
+               if (jmxServer != null) {
+                       try {
+                               jmxServer.stop();
+                       } catch (IOException e) {
+                               LOG.error("Failed to stop JMX server.", e);
+                       }
+               }
+       }
+       
+       public int getPort() {
+               if (jmxServer == null) {
+                       throw new NullPointerException("No server was opened. 
Did you specify a port?");
+               }
+               return jmxServer.port;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  adding / removing metrics
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+               final String name = generateJmxName(metricName, 
group.getScopeComponents());
+
+               AbstractBean jmxMetric;
+               ObjectName jmxName;
+               try {
+                       jmxName = new ObjectName(name);
+               } catch (MalformedObjectNameException e) {
+                       LOG.error("Metric name did not conform to JMX 
ObjectName rules: " + name, e);
+                       return;
+               }
+
+               if (metric instanceof Gauge) {
+                       jmxMetric = new JmxGauge((Gauge<?>) metric);
+               } else if (metric instanceof Counter) {
+                       jmxMetric = new JmxCounter((Counter) metric);
+               } else if (metric instanceof Histogram) {
+                       jmxMetric = new JmxHistogram((Histogram) metric);
+               } else {
+                       LOG.error("Cannot add unknown metric type: {}. This 
indicates that the metric type " +
+                               "is not supported by this reporter.", 
metric.getClass().getName());
+                       return;
+               }
+
+               try {
+                       synchronized (this) {
+                               mBeanServer.registerMBean(jmxMetric, jmxName);
+                               registeredMetrics.put(metric, jmxName);
+                       }
+               } catch (NotCompliantMBeanException e) {
+                       // implementation error on our side
+                       LOG.error("Metric did not comply with JMX MBean naming 
rules.", e);
+               } catch (InstanceAlreadyExistsException e) {
+                       LOG.debug("A metric with the name " + jmxName + " was 
already registered.", e);
+                       LOG.error("A metric with the name " + jmxName + " was 
already registered.");
+               } catch (Throwable t) {
+                       LOG.error("Failed to register metric", t);
+               }
+       }
+
+       @Override
+       public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
+               try {
+                       synchronized (this) {
+                               final ObjectName jmxName = 
registeredMetrics.remove(metric);
+
+                               // remove the metric if it is known. if it is 
not known, ignore the request
+                               if (jmxName != null) {
+                                       mBeanServer.unregisterMBean(jmxName);
+                               }
+                       }
+               } catch (InstanceNotFoundException e) {
+                       // alright then
+               } catch (Throwable t) {
+                       // never propagate exceptions - the metrics reporter 
should not affect the stability
+                       // of the running system
+                       LOG.error("Un-registering metric failed", t);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities 
+       // 
------------------------------------------------------------------------
+
+       static String generateJmxName(String metricName, String[] 
scopeComponents) {
+               final StringBuilder nameBuilder = new StringBuilder(128);
+               nameBuilder.append(PREFIX);
+
+               for (int x = 0; x < scopeComponents.length; x++) {
+                       // write keyX=
+                       nameBuilder.append(KEY_PREFIX);
+                       nameBuilder.append(x);
+                       nameBuilder.append("=");
+
+                       // write scope component
+                       
nameBuilder.append(replaceInvalidChars(scopeComponents[x]));
+                       nameBuilder.append(",");
+               }
+
+               // write the name
+               
nameBuilder.append("name=").append(replaceInvalidChars(metricName));
+
+               return nameBuilder.toString();
+       }
+       
+       /**
+        * Lightweight method to replace unsupported characters.
+        * If the string does not contain any unsupported characters, this 
method creates no
+        * new string (and in fact no new objects at all).
+        * 
+        * <p>Replacements:
+        * 
+        * <ul>
+        *     <li>{@code "} is removed</li>
+        *     <li>{@code space} is replaced by {@code _} (underscore)</li>
+        *     <li>{@code , = ; : ? ' *} are replaced by {@code -} (hyphen)</li>
+        * </ul>
+        */
+       static String replaceInvalidChars(String str) {
+               char[] chars = null;
+               final int strLen = str.length();
+               int pos = 0;
+               
+               for (int i = 0; i < strLen; i++) {
+                       final char c = str.charAt(i);
+                       switch (c) {
+                               case '"':
+                                       // remove character by not moving cursor
+                                       if (chars == null) {
+                                               chars = str.toCharArray();
+                                       }
+                                       break;
+
+                               case ' ':
+                                       if (chars == null) {
+                                               chars = str.toCharArray();
+                                       }
+                                       chars[pos++] = '_';
+                                       break;
+                               
+                               case ',':
+                               case '=':
+                               case ';':
+                               case ':':
+                               case '?':
+                               case '\'':
+                               case '*':
+                                       if (chars == null) {
+                                               chars = str.toCharArray();
+                                       }
+                                       chars[pos++] = '-';
+                                       break;
+
+                               default:
+                                       if (chars != null) {
+                                               chars[pos] = c;
+                                       }
+                                       pos++;
+                       }
+               }
+               
+               return chars == null ? str : new String(chars, 0, pos);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Interfaces and base classes for JMX beans 
+       // 
------------------------------------------------------------------------
+
+       public interface MetricMBean {}
+
+       private abstract static class AbstractBean implements MetricMBean {}
+
+       public interface JmxCounterMBean extends MetricMBean {
+               long getCount();
+       }
+
+       private static class JmxCounter extends AbstractBean implements 
JmxCounterMBean {
+               private Counter counter;
+
+               JmxCounter(Counter counter) {
+                       this.counter = counter;
+               }
+
+               @Override
+               public long getCount() {
+                       return counter.getCount();
+               }
+       }
+
+       public interface JmxGaugeMBean extends MetricMBean {
+               Object getValue();
+       }
+
+       private static class JmxGauge extends AbstractBean implements 
JmxGaugeMBean {
+
+               private final Gauge<?> gauge;
+
+               JmxGauge(Gauge<?> gauge) {
+                       this.gauge = gauge;
+               }
+
+               @Override
+               public Object getValue() {
+                       return gauge.getValue();
+               }
+       }
+
+       public interface JmxHistogramMBean extends MetricMBean {
+               long getCount();
+
+               double getMean();
+
+               double getStdDev();
+
+               long getMax();
+
+               long getMin();
+
+               double getMedian();
+
+               double get75thPercentile();
+
+               double get95thPercentile();
+
+               double get98thPercentile();
+
+               double get99thPercentile();
+
+               double get999thPercentile();
+       }
+
+       private static class JmxHistogram extends AbstractBean implements 
JmxHistogramMBean {
+
+               private final Histogram histogram;
+
+               JmxHistogram(Histogram histogram) {
+                       this.histogram = histogram;
+               }
+
+               @Override
+               public long getCount() {
+                       return histogram.getCount();
+               }
+
+               @Override
+               public double getMean() {
+                       return histogram.getStatistics().getMean();
+               }
+
+               @Override
+               public double getStdDev() {
+                       return histogram.getStatistics().getStdDev();
+               }
+
+               @Override
+               public long getMax() {
+                       return histogram.getStatistics().getMax();
+               }
+
+               @Override
+               public long getMin() {
+                       return histogram.getStatistics().getMin();
+               }
+
+               @Override
+               public double getMedian() {
+                       return histogram.getStatistics().getQuantile(0.5);
+               }
+
+               @Override
+               public double get75thPercentile() {
+                       return histogram.getStatistics().getQuantile(0.75);
+               }
+
+               @Override
+               public double get95thPercentile() {
+                       return histogram.getStatistics().getQuantile(0.95);
+               }
+
+               @Override
+               public double get98thPercentile() {
+                       return histogram.getStatistics().getQuantile(0.98);
+               }
+
+               @Override
+               public double get99thPercentile() {
+                       return histogram.getStatistics().getQuantile(0.99);
+               }
+
+               @Override
+               public double get999thPercentile() {
+                       return histogram.getStatistics().getQuantile(0.999);
+               }
+       }
+
+       /**
+        * JMX Server implementation that JMX clients can connect to.
+        *
+        * Heavily based on j256 simplejmx project
+        *
+        * 
https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
+        */
+       private static class JMXServer {
+               private Registry rmiRegistry;
+               private JMXConnectorServer connector;
+               private int port;
+
+               public void start(int port) throws IOException {
+                       if (rmiRegistry != null && connector != null) {
+                               LOG.debug("JMXServer is already running.");
+                               return;
+                       }
+                       startRmiRegistry(port);
+                       startJmxService(port);
+                       this.port = port;
+               }
+
+               /**
+                * Starts an RMI Registry that allows clients to lookup the JMX 
IP/port.
+                *
+                * @param port rmi port to use
+                * @throws IOException
+                */
+               private void startRmiRegistry(int port) throws IOException {
+                       rmiRegistry = LocateRegistry.createRegistry(port);
+               }
+
+               /**
+                * Starts a JMX connector that allows (un)registering MBeans 
with the MBean server and RMI invocations.
+                *
+                * @param port jmx port to use
+                * @throws IOException
+                */
+               private void startJmxService(int port) throws IOException {
+                       String serviceUrl = "service:jmx:rmi://localhost:" + 
port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
+                       JMXServiceURL url;
+                       try {
+                               url = new JMXServiceURL(serviceUrl);
+                       } catch (MalformedURLException e) {
+                               throw new IllegalArgumentException("Malformed 
service url created " + serviceUrl, e);
+                       }
+
+                       connector = 
JMXConnectorServerFactory.newJMXConnectorServer(url, null, 
ManagementFactory.getPlatformMBeanServer());
+
+                       connector.start();
+               }
+
+               public void stop() throws IOException {
+                       if (connector != null) {
+                               try {
+                                       connector.stop();
+                               } finally {
+                                       connector = null;
+                               }
+                       }
+                       if (rmiRegistry != null) {
+                               try {
+                                       
UnicastRemoteObject.unexportObject(rmiRegistry, true);
+                               } catch (NoSuchObjectException e) {
+                                       throw new IOException("Could not 
un-export our RMI registry", e);
+                               } finally {
+                                       rmiRegistry = null;
+                               }
+                       }
+               }
+       }
+}

Reply via email to