[FLINK-1502] [core] Add basic metric system

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/003ce18e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/003ce18e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/003ce18e

Branch: refs/heads/master
Commit: 003ce18efc0249fae874e56c3df6acf19f5f2429
Parents: 8ed3685
Author: zentol <[email protected]>
Authored: Mon May 9 14:54:32 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Sun May 22 19:58:17 2016 +0200

----------------------------------------------------------------------
 .../flink-statebackend-rocksdb/pom.xml          |   7 +
 flink-contrib/flink-storm/pom.xml               |   8 +
 .../flink/storm/wrappers/BoltWrapperTest.java   |   7 +
 flink-core/pom.xml                              |   6 +
 .../api/common/functions/RuntimeContext.java    |   8 +
 .../util/AbstractRuntimeUDFContext.java         |  12 +-
 .../functions/util/RuntimeUDFContext.java       |   6 +-
 .../common/operators/CollectionExecutor.java    |  48 +++-
 .../java/org/apache/flink/metrics/Counter.java  |  69 ++++++
 .../java/org/apache/flink/metrics/Gauge.java    |  33 +++
 .../java/org/apache/flink/metrics/Metric.java   |  27 +++
 .../org/apache/flink/metrics/MetricGroup.java   |  99 +++++++++
 .../apache/flink/metrics/MetricRegistry.java    | 170 +++++++++++++++
 .../metrics/groups/AbstractMetricGroup.java     | 145 +++++++++++++
 .../metrics/groups/ComponentMetricGroup.java    | 113 ++++++++++
 .../metrics/groups/GenericMetricGroup.java      |  57 +++++
 .../flink/metrics/groups/IOMetricGroup.java     |  70 ++++++
 .../flink/metrics/groups/JobMetricGroup.java    |  71 ++++++
 .../metrics/groups/OperatorMetricGroup.java     |  46 ++++
 .../org/apache/flink/metrics/groups/Scope.java  | 120 ++++++++++
 .../metrics/groups/TaskManagerMetricGroup.java  |  70 ++++++
 .../flink/metrics/groups/TaskMetricGroup.java   |  87 ++++++++
 .../metrics/reporter/AbstractReporter.java      |  48 ++++
 .../flink/metrics/reporter/JMXReporter.java     | 167 ++++++++++++++
 .../flink/metrics/reporter/MetricReporter.java  |  73 +++++++
 .../flink/metrics/reporter/Scheduled.java       |  32 +++
 .../functions/util/RuntimeUDFContextTest.java   |  11 +-
 .../api/common/io/RichInputFormatTest.java      |   3 +-
 .../api/common/io/RichOutputFormatTest.java     |   3 +-
 .../operators/GenericDataSinkBaseTest.java      |   5 +-
 .../operators/GenericDataSourceBaseTest.java    |   5 +-
 .../base/FlatMapOperatorCollectionTest.java     |   3 +-
 .../base/InnerJoinOperatorBaseTest.java         |   5 +-
 .../common/operators/base/MapOperatorTest.java  |   5 +-
 .../base/PartitionMapOperatorTest.java          |   5 +-
 .../apache/flink/metrics/MetricGroupTest.java   |  93 ++++++++
 .../flink/metrics/MetricRegistryTest.java       | 217 +++++++++++++++++++
 .../flink/metrics/groups/JobGroupTest.java      |  71 ++++++
 .../flink/metrics/groups/OperatorGroupTest.java |  86 ++++++++
 .../flink/metrics/groups/TaskGroupTest.java     |  80 +++++++
 .../metrics/groups/TaskManagerGroupTest.java    |  70 ++++++
 .../flink/metrics/reporter/JMXReporterTest.java |  43 ++++
 .../flink/metrics/util/DummyJobMetricGroup.java |  50 +++++
 .../flink/metrics/util/DummyMetricGroup.java    |  57 +++++
 .../flink/metrics/util/DummyMetricRegistry.java |  34 +++
 .../metrics/util/DummyOperatorMetricGroup.java  |  43 ++++
 .../flink/metrics/util/DummyReporter.java       |  47 ++++
 .../util/DummyTaskManagerMetricGroup.java       |  48 ++++
 .../metrics/util/DummyTaskMetricGroup.java      |  48 ++++
 .../apache/flink/metrics/util/TestReporter.java |  40 ++++
 flink-dist/src/main/flink-bin/bin/config.sh     |   6 +
 .../src/main/flink-bin/bin/flink-daemon.sh      |  18 +-
 .../base/CoGroupOperatorCollectionTest.java     |   3 +-
 .../operators/base/GroupReduceOperatorTest.java |   5 +-
 .../base/InnerJoinOperatorBaseTest.java         |   5 +-
 .../operators/base/ReduceOperatorTest.java      |   5 +-
 flink-metrics/flink-metrics-dropwizard/pom.xml  |  72 ++++++
 .../dropwizard/ScheduledDropwizardReporter.java |  99 +++++++++
 .../dropwizard/metrics/CounterWrapper.java      |  33 +++
 .../flink/dropwizard/metrics/GaugeWrapper.java  |  33 +++
 flink-metrics/flink-metrics-ganglia/pom.xml     |  90 ++++++++
 .../flink/metrics/graphite/GangliaReporter.java |  73 +++++++
 flink-metrics/flink-metrics-graphite/pom.xml    |  84 +++++++
 .../metrics/graphite/GraphiteReporter.java      |  70 ++++++
 flink-metrics/flink-metrics-statsd/pom.xml      |  43 ++++
 .../flink/metrics/statsd/StatsDReporter.java    | 132 +++++++++++
 flink-metrics/pom.xml                           |  42 ++++
 flink-runtime/pom.xml                           |   4 -
 .../deployment/TaskDeploymentDescriptor.java    |   7 +
 .../flink/runtime/execution/Environment.java    |   8 +
 .../runtime/executiongraph/ExecutionVertex.java |   1 +
 .../api/reader/AbstractRecordReader.java        |   8 +
 .../io/network/api/reader/BufferReader.java     |   5 +
 .../io/network/api/reader/ReaderBase.java       |   8 +
 .../AdaptiveSpanningRecordDeserializer.java     |  20 ++
 .../api/serialization/RecordDeserializer.java   |   8 +
 .../api/serialization/RecordSerializer.java     |   8 +
 .../serialization/SpanningRecordSerializer.java |  19 ++
 ...llingAdaptiveSpanningRecordDeserializer.java |  21 ++
 .../io/network/api/writer/RecordWriter.java     |  11 +
 .../iterative/task/AbstractIterativeTask.java   |   9 +-
 .../flink/runtime/operators/BatchTask.java      |  19 +-
 .../flink/runtime/operators/DataSinkTask.java   |   5 +-
 .../flink/runtime/operators/DataSourceTask.java |   5 +-
 .../runtime/operators/GroupReduceDriver.java    |   2 +-
 .../flink/runtime/operators/TaskContext.java    |   3 +
 .../operators/chaining/ChainedDriver.java       |   8 +-
 .../util/DistributedRuntimeUDFContext.java      |   5 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |  11 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  12 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 169 ++++++++++++++-
 .../TaskDeploymentDescriptorTest.java           |   3 +-
 .../network/api/reader/AbstractReaderTest.java  |   5 +
 .../operators/drivers/TestTaskContext.java      |   7 +
 .../testutils/BinaryOperatorTestBase.java       |   7 +
 .../operators/testutils/DriverTestBase.java     |   7 +
 .../operators/testutils/DummyEnvironment.java   |   7 +
 .../operators/testutils/MockEnvironment.java    |   7 +
 .../testutils/UnaryOperatorTestBase.java        |   6 +
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   6 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  27 +--
 .../flink/runtime/taskmanager/TaskStopTest.java |   3 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   6 +-
 .../flink/streaming/api/graph/StreamConfig.java |   9 +
 .../api/graph/StreamingJobGraphGenerator.java   |  11 +-
 .../api/operators/AbstractStreamOperator.java   |   9 +
 .../streaming/api/operators/StreamOperator.java |   3 +
 .../api/operators/StreamingRuntimeContext.java  |   3 +-
 .../runtime/io/StreamInputProcessor.java        |  12 +
 .../runtime/io/StreamTwoInputProcessor.java     |  12 +
 .../runtime/tasks/OneInputStreamTask.java       |   1 +
 .../streaming/runtime/tasks/OperatorChain.java  |   1 +
 .../runtime/tasks/TwoInputStreamTask.java       |   1 +
 ...AlignedProcessingTimeWindowOperatorTest.java |   2 +
 ...AlignedProcessingTimeWindowOperatorTest.java |   2 +
 .../runtime/tasks/StreamMockEnvironment.java    |   7 +
 .../streaming/runtime/tasks/StreamTaskTest.java |   6 +-
 pom.xml                                         |   2 +
 118 files changed, 3936 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml 
b/flink-contrib/flink-statebackend-rocksdb/pom.xml
index b966371..cccdc20 100644
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -54,6 +54,13 @@ under the License.
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-runtime_2.10</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml 
b/flink-contrib/flink-storm/pom.xml
index 24b0b4b..a080a03 100644
--- a/flink-contrib/flink-storm/pom.xml
+++ b/flink-contrib/flink-storm/pom.xml
@@ -49,6 +49,14 @@ under the License.
                </dependency>
 
                <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
                        <groupId>org.apache.storm</groupId>
                        <artifactId>storm-core</artifactId>
                        <version>0.9.4</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index e65dc45..cb9ac1c 100644
--- 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -32,7 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.util.DummyTaskMetricGroup;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
 import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.SplitStreamType;
 import org.apache.flink.storm.util.StormConfig;
@@ -141,6 +144,7 @@ public class BoltWrapperTest extends AbstractTest {
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
                when(taskContext.getTaskName()).thenReturn("name");
+               when(taskContext.getMetricGroup()).thenReturn(new 
DummyMetricGroup());
 
                final IRichBolt bolt = mock(IRichBolt.class);
 
@@ -225,6 +229,7 @@ public class BoltWrapperTest extends AbstractTest {
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
                when(taskContext.getTaskName()).thenReturn("name");
+               when(taskContext.getMetricGroup()).thenReturn(new 
DummyMetricGroup());
 
                final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
                declarer.declare(new Fields("dummy"));
@@ -289,6 +294,7 @@ public class BoltWrapperTest extends AbstractTest {
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
                when(taskContext.getTaskName()).thenReturn("name");
+               when(taskContext.getMetricGroup()).thenReturn(new 
DummyMetricGroup());
 
                final IRichBolt bolt = mock(IRichBolt.class);
                BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, 
Object>(bolt);
@@ -361,6 +367,7 @@ public class BoltWrapperTest extends AbstractTest {
                Environment env = mock(Environment.class);
                when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 0, 
1, 0));
                
when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
+               when(env.getMetricGroup()).thenReturn(new 
DummyTaskMetricGroup());
 
                StreamTask<?, ?> mockTask = mock(StreamTask.class);
                when(mockTask.getCheckpointLock()).thenReturn(new Object());

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index eb55bdd..0e0b54b 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -47,6 +47,12 @@ under the License.
                        <!-- managed version -->
                </dependency>
 
+               <dependency>
+                       <groupId>io.dropwizard.metrics</groupId>
+                       <artifactId>metrics-core</artifactId>
+                       <version>${metrics.version}</version>
+               </dependency>
+
                <!-- Avro is needed for the interoperability with Avro types 
for serialization -->
                <dependency>
                        <groupId>org.apache.avro</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 3225c00..ed2f613 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.MetricGroup;
 
 /**
  * A RuntimeContext contains information about the context in which functions 
are executed. Each parallel instance
@@ -59,6 +60,13 @@ public interface RuntimeContext {
        String getTaskName();
 
        /**
+        * Returns the metric group for this parallel subtask.
+        * 
+        * @return The metric group for this parallel subtask.
+     */
+       MetricGroup getMetricGroup();
+
+       /**
         * Gets the parallelism with which the parallel task runs.
         * 
         * @return The parallelism with which the parallel task runs.

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 74b78df..6645964 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -43,6 +43,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.MetricGroup;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -61,17 +62,21 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
        private final Map<String, Accumulator<?, ?>> accumulators;
 
        private final DistributedCache distributedCache;
+       
+       private final MetricGroup metrics;
 
        public AbstractRuntimeUDFContext(TaskInfo taskInfo,
                                                                                
ClassLoader userCodeClassLoader,
                                                                                
ExecutionConfig executionConfig,
                                                                                
Map<String, Accumulator<?,?>> accumulators,
-                                                                               
Map<String, Future<Path>> cpTasks) {
+                                                                               
Map<String, Future<Path>> cpTasks,
+                                                                               
MetricGroup metrics) {
                this.taskInfo = checkNotNull(taskInfo);
                this.userCodeClassLoader = userCodeClassLoader;
                this.executionConfig = executionConfig;
                this.distributedCache = new 
DistributedCache(checkNotNull(cpTasks));
                this.accumulators = checkNotNull(accumulators);
+               this.metrics = metrics;
        }
 
        @Override
@@ -93,6 +98,11 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
        public int getIndexOfThisSubtask() {
                return taskInfo.getIndexOfThisSubtask();
        }
+       
+       @Override
+       public MetricGroup getMetricGroup() {
+               return metrics;
+       }
 
        @Override
        public int getAttemptNumber() {

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
index 2337afa..6571d0d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.MetricGroup;
 
 /**
  * A standalone implementation of the {@link RuntimeContext}, created by 
runtime UDF operators.
@@ -42,8 +43,9 @@ public class RuntimeUDFContext extends 
AbstractRuntimeUDFContext {
        private final HashMap<String, List<?>> uninitializedBroadcastVars = new 
HashMap<String, List<?>>();
 
        public RuntimeUDFContext(TaskInfo taskInfo, ClassLoader 
userCodeClassLoader, ExecutionConfig executionConfig,
-                                                               Map<String, 
Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
-               super(taskInfo, userCodeClassLoader, executionConfig, 
accumulators, cpTasks);
+                                                               Map<String, 
Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulators,
+                                                               MetricGroup 
metrics) {
+               super(taskInfo, userCodeClassLoader, executionConfig, 
accumulators, cpTasks, metrics);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index 44042c4..7e5269e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -36,6 +36,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -58,9 +59,15 @@ import 
org.apache.flink.api.common.operators.util.TypeComparable;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.JobMetricGroup;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.types.Value;
+import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.Visitor;
 
 /**
@@ -86,6 +93,8 @@ public class CollectionExecutor {
        private final ExecutionConfig executionConfig;
 
        private int iterationSuperstep;
+       
+       private JobMetricGroup jobMetricGroup;
 
        // 
--------------------------------------------------------------------------------------------
        
@@ -106,6 +115,14 @@ public class CollectionExecutor {
        
        public JobExecutionResult execute(Plan program) throws Exception {
                long startTime = System.currentTimeMillis();
+               
+               JobID jobID = program.getJobId();
+               if (jobID == null) {
+                       jobID = new JobID();
+               }
+               this.jobMetricGroup = 
+                       new TaskManagerMetricGroup(new MetricRegistry(new 
Configuration()), "localhost", new AbstractID().toString())
+                               .addJob(jobID, program.getJobName());
                initCache(program.getCachedFiles());
                Collection<? extends GenericDataSinkBase<?>> sinks = 
program.getDataSinks();
                for (Operator<?> sink : sinks) {
@@ -184,9 +201,12 @@ public class CollectionExecutor {
                // build the runtime context and compute broadcast variables, 
if necessary
                TaskInfo taskInfo = new TaskInfo(typedSink.getName(), 0, 1, 0);
                RuntimeUDFContext ctx;
+
+               MetricGroup metrics = this.jobMetricGroup.addTask(new 
AbstractID(), new AbstractID(), 0, typedSink.getName());
+                       
                if 
(RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators) :
-                                       new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators);
+                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
+                                       new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators, metrics);
                } else {
                        ctx = null;
                }
@@ -200,10 +220,13 @@ public class CollectionExecutor {
                GenericDataSourceBase<OUT, ?> typedSource = 
(GenericDataSourceBase<OUT, ?>) source;
                // build the runtime context and compute broadcast variables, 
if necessary
                TaskInfo taskInfo = new TaskInfo(typedSource.getName(), 0, 1, 
0);
+               
                RuntimeUDFContext ctx;
+
+               MetricGroup metrics = this.jobMetricGroup.addTask(new 
AbstractID(), new AbstractID(), 0, source.getName());
                if 
(RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators) :
-                                       new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators);
+                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
+                                       new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators, metrics);
                } else {
                        ctx = null;
                }
@@ -225,9 +248,11 @@ public class CollectionExecutor {
                // build the runtime context and compute broadcast variables, 
if necessary
                TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
                RuntimeUDFContext ctx;
+
+               MetricGroup metrics = this.jobMetricGroup.addTask(new 
AbstractID(), new AbstractID(), 0, typedOp.getName());
                if 
(RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators) :
-                                       new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators);
+                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
+                                       new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators, metrics);
                        
                        for (Map.Entry<String, Operator<?>> bcInputs : 
operator.getBroadcastInputs().entrySet()) {
                                List<?> bcData = execute(bcInputs.getValue());
@@ -265,9 +290,11 @@ public class CollectionExecutor {
                // build the runtime context and compute broadcast variables, 
if necessary
                TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
                RuntimeUDFContext ctx;
+
+               MetricGroup metrics = this.jobMetricGroup.addTask(new 
AbstractID(), new AbstractID(), 0, typedOp.getName());
                if 
(RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators) :
-                               new IterationRuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators);
+                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
+                               new IterationRuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics);
                        
                        for (Map.Entry<String, Operator<?>> bcInputs : 
operator.getBroadcastInputs().entrySet()) {
                                List<?> bcData = execute(bcInputs.getValue());
@@ -523,8 +550,9 @@ public class CollectionExecutor {
        private class IterationRuntimeUDFContext extends RuntimeUDFContext 
implements IterationRuntimeContext {
 
                public IterationRuntimeUDFContext(TaskInfo taskInfo, 
ClassLoader classloader, ExecutionConfig executionConfig,
-                                                                               
        Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> 
accumulators) {
-                       super(taskInfo, classloader, executionConfig, cpTasks, 
accumulators);
+                                                                               
        Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> 
accumulators,
+                                                                               
        MetricGroup metrics) {
+                       super(taskInfo, classloader, executionConfig, cpTasks, 
accumulators, metrics);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java 
b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
new file mode 100644
index 0000000..b18da4f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A Counter is a {@link org.apache.flink.metrics.Metric} that measures a 
count.
+ */
+@PublicEvolving
+public final class Counter implements Metric {
+       private long count = 0;
+
+       /**
+        * Increment the current count by 1.
+        */
+       public void inc() {
+               count++;
+       }
+
+       /**
+        * Increment the current count by the given value.
+        *
+        * @param n value to increment the current count by
+        */
+       public void inc(long n) {
+               count += n;
+       }
+
+       /**
+        * Decrement the current count by 1.
+        */
+       public void dec() {
+               count--;
+       }
+
+       /**
+        * Decrement the current count by the given value.
+        *
+        * @param n value to decrement the current count by
+        */
+       public void dec(long n) {
+               count -= n;
+       }
+
+       /**
+        * Returns the current count.
+        *
+        * @return current count
+        */
+       public long getCount() {
+               return count;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java 
b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
new file mode 100644
index 0000000..455587a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A Gauge is a {@link org.apache.flink.metrics.Metric} that calculates a 
specific value at a point in time.
+ */
+@PublicEvolving
+public abstract class Gauge<T> implements Metric {
+       /**
+        * Calculates and returns the measured value.
+        *
+        * @return calculated value
+        */
+       public abstract T getValue();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java 
b/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
new file mode 100644
index 0000000..11cfcc6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Common interface for all metrics.
+ */
+@PublicEvolving
+public interface Metric {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
new file mode 100644
index 0000000..4036129
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A MetricGroup is a named container for {@link 
org.apache.flink.metrics.Metric}s and
+ * {@link org.apache.flink.metrics.MetricGroup}s.
+ * <p>
+ * Instances of this class can be used to register new metrics with Flink and 
to create a nested hierarchy based on the
+ * group names.
+ * <p>
+ * A MetricGroup is uniquely identified by it's place in the hierarchy and 
name.
+ */
+@PublicEvolving
+public interface MetricGroup {
+
+       /**
+        * Recursively unregisters all {@link org.apache.flink.metrics.Metric}s 
contained in this
+        * {@link org.apache.flink.metrics.MetricGroup}
+        */
+       void close();
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+       // Metrics
+       // 
-----------------------------------------------------------------------------------------------------------------
+
+       /**
+        * Creates and registers a new {@link org.apache.flink.metrics.Counter} 
with Flink.
+        *
+        * @param name name of the counter
+        * @return the registered counter
+        */
+       Counter counter(int name);
+
+       /**
+        * Creates and registers a new {@link org.apache.flink.metrics.Counter} 
with Flink.
+        *
+        * @param name name of the counter
+        * @return the registered counter
+        */
+       Counter counter(String name);
+
+       /**
+        * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
+        *
+        * @param name  name of the gauge
+        * @param gauge gauge to register
+        * @param <T>   return type of the gauge
+        * @return the registered gauge
+        */
+       <T> Gauge<T> gauge(int name, Gauge<T> gauge);
+
+       /**
+        * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
+        *
+        * @param name  name of the gauge
+        * @param gauge gauge to register
+        * @param <T>   return type of the gauge
+        * @return the registered gauge
+        */
+       <T> Gauge<T> gauge(String name, Gauge<T> gauge);
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+       // Groups
+       // 
-----------------------------------------------------------------------------------------------------------------
+
+       /**
+        * Creates a new {@link org.apache.flink.metrics.MetricGroup} and adds 
it to this groups sub-groups.
+        *
+        * @param name name of the group
+        * @return the created group
+        */
+       MetricGroup addGroup(int name);
+
+       /**
+        * Creates a new {@link org.apache.flink.metrics.MetricGroup} and adds 
it to this groups sub-groups.
+        *
+        * @param name name of the group
+        * @return the created group
+        */
+       MetricGroup addGroup(String name);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java 
b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
new file mode 100644
index 0000000..0e8b0d5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.metrics.groups.Scope;
+import org.apache.flink.metrics.reporter.JMXReporter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB;
+import static 
org.apache.flink.metrics.groups.OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR;
+import static 
org.apache.flink.metrics.groups.TaskManagerMetricGroup.DEFAULT_SCOPE_TM;
+import static 
org.apache.flink.metrics.groups.TaskMetricGroup.DEFAULT_SCOPE_TASK;
+
+/**
+ * A MetricRegistry keeps track of all registered {@link 
org.apache.flink.metrics.Metric}s. It serves as the
+ * connection between {@link org.apache.flink.metrics.MetricGroup}s and {@link 
org.apache.flink.metrics.reporter.MetricReporter}s.
+ */
+@Internal
+public class MetricRegistry {
+       private static final Logger LOG = 
LoggerFactory.getLogger(MetricRegistry.class);
+
+       private final MetricReporter reporter;
+       private java.util.Timer timer;
+
+       public static final String KEY_METRICS_REPORTER_CLASS = 
"metrics.reporter.class";
+       public static final String KEY_METRICS_REPORTER_ARGUMENTS = 
"metrics.reporter.arguments";
+       public static final String KEY_METRICS_REPORTER_INTERVAL = 
"metrics.reporter.interval";
+
+       public static final String KEY_METRICS_SCOPE_NAMING_TM = 
"metrics.scope.tm";
+       public static final String KEY_METRICS_SCOPE_NAMING_JOB = 
"metrics.scope.job";
+       public static final String KEY_METRICS_SCOPE_NAMING_TASK = 
"metrics.scope.task";
+       public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = 
"metrics.scope.operator";
+
+       private final Scope.ScopeFormat scopeConfig;
+
+       /**
+        * Creates a new {@link MetricRegistry} and starts the configured 
reporter.
+        */
+       public MetricRegistry(Configuration config) {
+               try {
+                       String className = 
config.getString(KEY_METRICS_REPORTER_CLASS, null);
+                       if (className == null) {
+                               LOG.info("No reporter class name defined in 
flink-conf.yaml, defaulting to " + JMXReporter.class.getName() + ".");
+                               className = JMXReporter.class.getName();
+                       }
+
+                       this.scopeConfig = createScopeConfig(config);
+
+                       Configuration reporterConfig = 
createReporterConfig(config);
+                       Class<?> reporterClass = Class.forName(className);
+                       reporter = (MetricReporter) reporterClass.newInstance();
+                       reporter.open(reporterConfig);
+
+                       if (reporter instanceof Scheduled) {
+                               String[] interval = 
config.getString(KEY_METRICS_REPORTER_INTERVAL, "10 SECONDS").split(" ");
+                               long millis = 
TimeUnit.valueOf(interval[1]).toMillis(Long.parseLong(interval[0]));
+                               timer = new java.util.Timer(true);
+                               timer.schedule(new TimerTask() {
+                                       @Override
+                                       public void run() {
+                                               ((Scheduled) reporter).report();
+                                       }
+                               }, millis, millis);
+                       }
+               } catch (InstantiationException | ClassNotFoundException e) {
+                       throw new RuntimeException("Error while instantiating 
reporter.", e);
+               } catch (IllegalAccessException e) {
+                       throw new RuntimeException("Implementation error.", e);
+               }
+       }
+
+       private static Configuration createReporterConfig(Configuration config) 
{
+               String[] interval = 
config.getString(KEY_METRICS_REPORTER_INTERVAL, "10 SECONDS").split(" ");
+
+               String[] arguments = 
config.getString(KEY_METRICS_REPORTER_ARGUMENTS, "").split(" ");
+
+               Configuration reporterConfig = new Configuration();
+               reporterConfig.setString("period", interval[0]);
+               reporterConfig.setString("timeunit", interval[1]);
+
+               if (arguments.length > 1) {
+                       for (int x = 0; x < arguments.length; x += 2) {
+                               
reporterConfig.setString(arguments[x].replace("--", ""), arguments[x + 1]);
+                       }
+               }
+               return reporterConfig;
+       }
+
+       private static Scope.ScopeFormat createScopeConfig(Configuration 
config) {
+               String tmFormat = config.getString(KEY_METRICS_SCOPE_NAMING_TM, 
DEFAULT_SCOPE_TM);
+               String jobFormat = 
config.getString(KEY_METRICS_SCOPE_NAMING_JOB, DEFAULT_SCOPE_JOB);
+               String taskFormat = 
config.getString(KEY_METRICS_SCOPE_NAMING_TASK, DEFAULT_SCOPE_TASK);
+               String operatorFormat = 
config.getString(KEY_METRICS_SCOPE_NAMING_OPERATOR, DEFAULT_SCOPE_OPERATOR);
+
+
+               Scope.ScopeFormat format = new Scope.ScopeFormat();
+               format.setTaskManagerFormat(tmFormat);
+               format.setJobFormat(jobFormat);
+               format.setTaskFormat(taskFormat);
+               format.setOperatorFormat(operatorFormat);
+               return format;
+       }
+
+       public Scope.ScopeFormat getScopeConfig() {
+               return this.scopeConfig;
+       }
+
+       /**
+        * Shuts down this registry and the associated {@link 
org.apache.flink.metrics.reporter.MetricReporter}.
+        */
+       public void shutdown() {
+               if (timer != null) {
+                       timer.cancel();
+               }
+               if (reporter != null) {
+                       reporter.close();
+               }
+       }
+
+       /**
+        * Registers a new {@link org.apache.flink.metrics.Metric} with this 
registry.
+        *
+        * @param metric metric to register
+        * @param name   name of the metric
+        * @param parent group that contains the metric
+        */
+       public void register(Metric metric, String name, AbstractMetricGroup 
parent) {
+               String metricName = reporter.generateName(name, 
parent.generateScope());
+
+
+               this.reporter.notifyOfAddedMetric(metric, metricName);
+       }
+
+       /**
+        * Un-registers the given {@link org.apache.flink.metrics.Metric} with 
this registry.
+        *
+        * @param metric metric to un-register
+        * @param name   name of the metric
+        * @param parent group that contains the metric
+        */
+       public void unregister(Metric metric, String name, AbstractMetricGroup 
parent) {
+               String metricName = reporter.generateName(name, 
parent.generateScope());
+               
+               this.reporter.notifyOfRemovedMetric(metric, metricName);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
new file mode 100644
index 0000000..373ac09
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.MetricRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Abstract {@link org.apache.flink.metrics.MetricGroup} that contains key 
functionality for adding metrics and groups.
+ */
+@Internal
+public abstract class AbstractMetricGroup implements MetricGroup {
+       private static final Logger LOG = 
LoggerFactory.getLogger(MetricGroup.class);
+       protected final MetricRegistry registry;
+
+       // all metrics that are directly contained in this group
+       protected final Map<String, Metric> metrics = new HashMap<>();
+       // all generic groups that are directly contained in this group
+       protected final Map<String, MetricGroup> groups = new HashMap<>();
+
+       public AbstractMetricGroup(MetricRegistry registry) {
+               this.registry = registry;
+       }
+
+       @Override
+       public void close() {
+               for (MetricGroup group : groups.values()) {
+                       group.close();
+               }
+               this.groups.clear();
+               for (Map.Entry<String, Metric> metric : metrics.entrySet()) {
+                       registry.unregister(metric.getValue(), metric.getKey(), 
this);
+               }
+               this.metrics.clear();
+       }
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+       // Scope
+       // 
-----------------------------------------------------------------------------------------------------------------
+
+       /**
+        * Generates the full scope based on the default/configured format that 
applies to all metrics within this group.
+        *
+        * @return generated scope
+        */
+       public abstract List<String> generateScope();
+
+       /**
+        * Generates the full scope based on the given format that applies to 
all metrics within this group.
+        *
+        * @param format format string
+        * @return generated scope
+        */
+       public abstract List<String> generateScope(Scope.ScopeFormat format);
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+       // Metrics
+       // 
-----------------------------------------------------------------------------------------------------------------
+
+       @Override
+       public Counter counter(int name) {
+               return counter("" + name);
+       }
+
+       @Override
+       public Counter counter(String name) {
+               Counter counter = new Counter();
+               addMetric(name, counter);
+               return counter;
+       }
+
+       @Override
+       public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
+               return gauge("" + name, gauge);
+       }
+
+       @Override
+       public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+               addMetric(name, gauge);
+               return gauge;
+       }
+
+       protected MetricGroup addMetric(String name, Metric metric) {
+               if (!name.matches("[a-zA-Z0-9]*")) {
+                       throw new IllegalArgumentException("Metric names may 
not contain special characters.");
+               }
+               if (metrics.containsKey(name)) {
+                       LOG.warn("Detected metric name collision. This group 
already contains a group for the given group name. " +
+                               this.generateScope().toString() + "." + name);
+               }
+               if (groups.containsKey(name)) {
+                       LOG.warn("Detected metric name collision. This group 
already contains a group for the given metric name." +
+                               this.generateScope().toString() + ")." + name);
+               }
+               metrics.put(name, metric);
+               registry.register(metric, name, this);
+               return this;
+       }
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+       // Groups
+       // 
-----------------------------------------------------------------------------------------------------------------
+
+       @Override
+       public MetricGroup addGroup(int name) {
+               return addGroup("" + name);
+       }
+
+       @Override
+       public MetricGroup addGroup(String name) {
+               if (metrics.containsKey(name)) {
+                       LOG.warn("Detected metric name collision. This group 
already contains a metric for the given group name."
+                               + this.generateScope().toString() + "." + name);
+               }
+               if (!groups.containsKey(name)) {
+                       groups.put(name, new GenericMetricGroup(registry, this, 
name));
+               }
+               return groups.get(name);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
new file mode 100644
index 0000000..81851e2
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.MetricRegistry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.metrics.groups.Scope.SCOPE_WILDCARD;
+
+/**
+ * Abstract {@link org.apache.flink.metrics.MetricGroup} that contains key 
functionality for modifying the scope.
+ */
+@Internal
+public abstract class ComponentMetricGroup extends AbstractMetricGroup {
+       private final ComponentMetricGroup parent;
+       private final String format;
+
+
+       // Map: scope variable -> specific value
+       protected final Map<String, String> formats;
+
+       /**
+        * Creates a new ComponentMetricGroup.
+        *
+        * @param registry    registry to register new metrics with
+        * @param parentGroup parent group, may be null
+        * @param scopeFormat default format string
+        */
+       public ComponentMetricGroup(MetricRegistry registry, 
ComponentMetricGroup parentGroup, String scopeFormat) {
+               super(registry);
+               this.formats = new HashMap<>();
+               this.parent = parentGroup;
+               this.format = scopeFormat;
+       }
+
+       @Override
+       public List<String> generateScope() {
+               return this.generateScope(this.format);
+       }
+
+       @Override
+       public List<String> generateScope(Scope.ScopeFormat format) {
+               return generateScope(getScopeFormat(format));
+       }
+
+       protected abstract String getScopeFormat(Scope.ScopeFormat format);
+
+       private List<String> generateScope(String format) {
+               String[] components = Scope.split(format);
+
+               List<String> scope = new ArrayList<>();
+               if (components[0].equals(SCOPE_WILDCARD)) {
+                       if (this.parent != null) {
+                               scope = this.parent.generateScope();
+                       }
+                       this.replaceFormats(components);
+                       addToList(scope, components, 1);
+               } else {
+                       if (this.parent != null) {
+                               this.parent.replaceFormats(components);
+                       }
+                       this.replaceFormats(components);
+                       addToList(scope, components, 0);
+               }
+               return scope;
+       }
+
+       private void replaceFormats(String[] components) {
+               if (this.parent != null) {
+                       this.parent.replaceFormats(components);
+               }
+               for (int x = 0; x < components.length; x++) {
+                       if (components[x].startsWith("<")) {
+                               if (this.formats.containsKey(components[x])) {
+                                       components[x] = 
this.formats.get(components[x]);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Adds all elements from the given array, starting from the given 
index, to the given list.
+        *
+        * @param list       destination
+        * @param array      source
+        * @param startIndex array index to start from
+        */
+       private static void addToList(List<String> list, String[] array, int 
startIndex) {
+               for (int x = startIndex; x < array.length; x++) {
+                       list.add(array[x]);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
new file mode 100644
index 0000000..5886312
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.MetricRegistry;
+
+import java.util.List;
+
+/**
+ * A simple named {@link org.apache.flink.metrics.MetricGroup} with no special 
properties.
+ */
+@Internal
+public class GenericMetricGroup extends AbstractMetricGroup {
+       private final AbstractMetricGroup parent;
+
+       private final String name;
+
+       protected GenericMetricGroup(MetricRegistry registry, 
AbstractMetricGroup parent, int name) {
+               this(registry, parent, "" + name);
+       }
+
+       protected GenericMetricGroup(MetricRegistry registry, 
AbstractMetricGroup parent, String name) {
+               super(registry);
+               this.parent = parent;
+               this.name = name;
+       }
+
+       @Override
+       public List<String> generateScope() {
+               List<String> scope = parent.generateScope();
+               scope.add(name);
+               return scope;
+       }
+
+       @Override
+       public List<String> generateScope(Scope.ScopeFormat format) {
+               List<String> scope = parent.generateScope(format);
+               scope.add(name);
+               return scope;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
new file mode 100644
index 0000000..68d91c4
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricRegistry;
+
+import java.util.List;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} that contains 
shareable pre-defined IO-related metrics.
+ */
+public class IOMetricGroup extends AbstractMetricGroup {
+       private final TaskMetricGroup parent;
+
+       private transient final Counter numBytesIn;
+       private transient final Counter numBytesOut;
+       private transient final Counter numRecordsIn;
+       private transient final Counter numRecordsOut;
+
+       public IOMetricGroup(MetricRegistry registry, TaskMetricGroup parent) {
+               super(registry);
+               this.parent = parent;
+               this.numBytesIn = parent.counter("numBytesIn");
+               this.numBytesOut = parent.counter("numBytesOut");
+               this.numRecordsIn = parent.counter("numRecordsIn");
+               this.numRecordsOut = parent.counter("numRecordsOut");
+       }
+
+       @Override
+       public List<String> generateScope() {
+               return parent.generateScope();
+       }
+
+       @Override
+       public List<String> generateScope(Scope.ScopeFormat format) {
+               return parent.generateScope(format);
+       }
+
+       public Counter getBytesInCounter() {
+               return this.numBytesIn;
+       }
+
+       public Counter getBytesOutCounter() {
+               return this.numBytesOut;
+       }
+
+       public Counter getRecordsInCounter() {
+               return this.numRecordsIn;
+       }
+
+       public Counter getRecordsOutCounter() {
+               return this.numRecordsOut;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
new file mode 100644
index 0000000..35a01f8
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.util.AbstractID;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.metrics.groups.TaskManagerMetricGroup.DEFAULT_SCOPE_TM;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing a Job.
+ * <p<
+ * Contains extra logic for adding tasks.
+ */
+@Internal
+public class JobMetricGroup extends ComponentMetricGroup {
+       public static final String SCOPE_JOB_DESCRIPTOR = "job";
+       public static final String SCOPE_JOB_ID = Scope.format("job_id");
+       public static final String SCOPE_JOB_NAME = Scope.format("job_name");
+       public static final String DEFAULT_SCOPE_JOB_COMPONENT = 
Scope.concat(SCOPE_JOB_NAME);
+       public static final String DEFAULT_SCOPE_JOB = 
Scope.concat(DEFAULT_SCOPE_TM, DEFAULT_SCOPE_JOB_COMPONENT);
+
+       private Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
+
+       public JobMetricGroup(MetricRegistry registry, TaskManagerMetricGroup 
taskManager, JobID id, String name) {
+               super(registry, taskManager, 
registry.getScopeConfig().getJobFormat());
+               this.formats.put(SCOPE_JOB_ID, id.toString());
+               this.formats.put(SCOPE_JOB_NAME, name);
+       }
+
+       public TaskMetricGroup addTask(AbstractID id, AbstractID attemptID, int 
subtaskIndex, String name) {
+               TaskMetricGroup task = new TaskMetricGroup(this.registry, this, 
id, attemptID, subtaskIndex, name);
+               tasks.put(id, task);
+               return task;
+       }
+
+       @Override
+       public void close() {
+               super.close();
+               for (MetricGroup group : tasks.values()) {
+                       group.close();
+               }
+               tasks.clear();
+       }
+
+       @Override
+       protected String getScopeFormat(Scope.ScopeFormat format) {
+               return format.getJobFormat();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
new file mode 100644
index 0000000..6475eec
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.MetricRegistry;
+
+import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing an 
Operator.
+ */
+@Internal
+public class OperatorMetricGroup extends ComponentMetricGroup {
+       public static final String SCOPE_OPERATOR_DESCRIPTOR = "operator";
+       public static final String SCOPE_OPERATOR_NAME = 
Scope.format("operator_name");
+       public static final String SCOPE_OPERATOR_SUBTASK_INDEX = 
Scope.format("subtask_index");
+       public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT = 
Scope.concat(SCOPE_OPERATOR_NAME, SCOPE_OPERATOR_SUBTASK_INDEX);
+       public static final String DEFAULT_SCOPE_OPERATOR = 
Scope.concat(DEFAULT_SCOPE_JOB, DEFAULT_SCOPE_OPERATOR_COMPONENT);
+
+       protected OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup 
task, String name, int subTaskIndex) {
+               super(registry, task, 
registry.getScopeConfig().getOperatorFormat());
+               this.formats.put(SCOPE_OPERATOR_NAME, name);
+               this.formats.put(SCOPE_OPERATOR_SUBTASK_INDEX, "" + 
subTaskIndex);
+       }
+
+       @Override
+       protected String getScopeFormat(Scope.ScopeFormat format) {
+               return format.getOperatorFormat();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java
new file mode 100644
index 0000000..47bae37
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * This class provides utility-functions for handling scopes.
+ */
+@Internal
+public class Scope {
+       public static final String SCOPE_WILDCARD = "*";
+
+       private static final String SCOPE_PREFIX = "<";
+       private static final String SCOPE_SUFFIX = ">";
+       private static final String SCOPE_SPLIT = ".";
+
+       private Scope() {
+       }
+
+       /**
+        * Modifies the given string to resemble a scope variable.
+        *
+        * @param scope string to format
+        * @return formatted string
+        */
+       public static String format(String scope) {
+               return SCOPE_PREFIX + scope + SCOPE_SUFFIX;
+       }
+
+       /**
+        * Joins the given components into a single scope.
+        *
+        * @param components components to join
+        * @return joined scoped
+        */
+       public static String concat(String... components) {
+               StringBuilder sb = new StringBuilder();
+               sb.append(components[0]);
+               for (int x = 1; x < components.length; x++) {
+                       sb.append(SCOPE_SPLIT);
+                       sb.append(components[x]);
+               }
+               return sb.toString();
+       }
+
+       /**
+        * Splits the given scope into it's individual components.
+        *
+        * @param scope scope to split
+        * @return array of components
+        */
+       public static String[] split(String scope) {
+               return scope.split("\\" + SCOPE_SPLIT);
+       }
+
+       /**
+        * Simple container for component scope format strings.
+        */
+       public static class ScopeFormat {
+               private String operatorFormat = 
OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR;
+               private String taskFormat = TaskMetricGroup.DEFAULT_SCOPE_TASK;
+               private String jobFormat = JobMetricGroup.DEFAULT_SCOPE_JOB;
+               private String taskManagerFormat = 
TaskManagerMetricGroup.DEFAULT_SCOPE_TM;
+
+               public ScopeFormat() {
+               }
+
+               public ScopeFormat setOperatorFormat(String format) {
+                       this.operatorFormat = format;
+                       return this;
+               }
+
+               public ScopeFormat setTaskFormat(String format) {
+                       this.taskFormat = format;
+                       return this;
+               }
+
+               public ScopeFormat setJobFormat(String format) {
+                       this.jobFormat = format;
+                       return this;
+               }
+
+               public ScopeFormat setTaskManagerFormat(String format) {
+                       this.taskManagerFormat = format;
+                       return this;
+               }
+
+               public String getOperatorFormat() {
+                       return this.operatorFormat;
+               }
+
+               public String getTaskFormat() {
+                       return this.taskFormat;
+               }
+
+               public String getJobFormat() {
+                       return this.jobFormat;
+               }
+
+               public String getTaskManagerFormat() {
+                       return this.taskManagerFormat;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
new file mode 100644
index 0000000..e199ca7
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.MetricRegistry;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing a 
TaskManager.
+ * <p<
+ * Contains extra logic for adding jobs.
+ */
+@Internal
+public class TaskManagerMetricGroup extends ComponentMetricGroup {
+       public static final String SCOPE_HOST_DESCRIPTOR = "host";
+       public static final String SCOPE_TM_DESCRIPTOR = "taskmanager";
+       public static final String SCOPE_TM_HOST = Scope.format("host");
+       public static final String SCOPE_TM_ID = Scope.format("tm_id");
+       public static final String DEFAULT_SCOPE_TM_COMPONENT = 
Scope.concat(SCOPE_TM_HOST, "taskmanager", SCOPE_TM_ID);
+       public static final String DEFAULT_SCOPE_TM = 
DEFAULT_SCOPE_TM_COMPONENT;
+
+       private Map<JobID, JobMetricGroup> jobs = new HashMap<>();
+
+       public TaskManagerMetricGroup(MetricRegistry registry, String host, 
String id) {
+               super(registry, null, 
registry.getScopeConfig().getTaskManagerFormat());
+               this.formats.put(SCOPE_TM_HOST, host);
+               this.formats.put(SCOPE_TM_ID, id);
+       }
+
+       public JobMetricGroup addJob(JobID id, String name) {
+               JobMetricGroup task = new JobMetricGroup(this.registry, this, 
id, name);
+               jobs.put(id, task);
+               return task;
+       }
+
+       @Override
+       public void close() {
+               super.close();
+               for (MetricGroup group : jobs.values()) {
+                       group.close();
+               }
+               jobs.clear();
+       }
+
+       @Override
+       protected String getScopeFormat(Scope.ScopeFormat format) {
+               return format.getTaskManagerFormat();
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
new file mode 100644
index 0000000..4f8e010
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.util.AbstractID;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing a Task.
+ * <p<
+ * Contains extra logic for adding operators.
+ */
+@Internal
+public class TaskMetricGroup extends ComponentMetricGroup {
+       public static final String SCOPE_TASK_DESCRIPTOR = "task";
+       public static final String SCOPE_TASK_ID = Scope.format("task_id");
+       public static final String SCOPE_TASK_NAME = Scope.format("task_name");
+       public static final String SCOPE_TASK_ATTEMPT = 
Scope.format("task_attempt");
+       public static final String SCOPE_TASK_SUBTASK_INDEX = 
Scope.format("subtask_index");
+       public static final String DEFAULT_SCOPE_TASK_COMPONENT = 
SCOPE_TASK_NAME;
+       public static final String DEFAULT_SCOPE_TASK = 
Scope.concat(DEFAULT_SCOPE_JOB, DEFAULT_SCOPE_TASK_COMPONENT);
+       private final int subtaskIndex;
+
+       private Map<String, OperatorMetricGroup> operators = new HashMap<>();
+       private IOMetricGroup ioMetrics;
+
+       protected TaskMetricGroup(MetricRegistry registry, JobMetricGroup job, 
AbstractID id, AbstractID attemptID, int subtaskIndex, String name) {
+               super(registry, job, registry.getScopeConfig().getTaskFormat());
+               this.formats.put(SCOPE_TASK_ID, id.toString());
+               this.formats.put(SCOPE_TASK_ATTEMPT, attemptID.toString());
+               this.formats.put(SCOPE_TASK_NAME, name);
+               this.formats.put(SCOPE_TASK_SUBTASK_INDEX, "" + subtaskIndex);
+               this.subtaskIndex = subtaskIndex;
+               this.ioMetrics = new IOMetricGroup(registry, this);
+       }
+
+       @Override
+       public void close() {
+               super.close();
+               for (MetricGroup group : operators.values()) {
+                       group.close();
+               }
+               operators.clear();
+       }
+
+       public OperatorMetricGroup addOperator(String name) {
+               OperatorMetricGroup operator = new 
OperatorMetricGroup(this.registry, this, name, this.subtaskIndex);
+               operators.put(name, operator);
+               return operator;
+       }
+
+       /**
+        * Returns the IOMetricGroup for this task.
+        *
+        * @return IOMetricGroup for this task.
+        */
+       public IOMetricGroup getIOMetricGroup() {
+               return this.ioMetrics;
+       }
+
+       @Override
+       protected String getScopeFormat(Scope.ScopeFormat format) {
+               return format.getTaskFormat();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
new file mode 100644
index 0000000..43f09b2
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.reporter;
+
+import 
org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+
+import java.util.Map;
+
+public abstract class AbstractReporter implements MetricReporter {
+       protected Map<String, Gauge> gauges = new ConcurrentHashMap<>();
+       protected Map<String, Counter> counters = new ConcurrentHashMap<>();
+
+       @Override
+       public void notifyOfAddedMetric(Metric metric, String name) {
+               if (metric instanceof Counter) {
+                       counters.put(name, (Counter) metric);
+               } else if (metric instanceof Gauge) {
+                       gauges.put(name, (Gauge) metric);
+               }
+       }
+
+       @Override
+       public void notifyOfRemovedMetric(Metric metric, String name) {
+               if (metric instanceof Counter) {
+                       counters.remove(name);
+               } else if (metric instanceof Gauge) {
+                       gauges.remove(name);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
new file mode 100644
index 0000000..0b2efe4
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.reporter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+
+/**
+ * {@link org.apache.flink.metrics.reporter.MetricReporter} that exports 
{@link org.apache.flink.metrics.Metric}s via JMX.
+ *
+ * Largely based on the JmxReporter class of the dropwizard metrics library
+ * 
https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java
+ */
+@Internal
+public class JMXReporter implements MetricReporter {
+       private static final Logger LOG = 
LoggerFactory.getLogger(JMXReporter.class);
+
+       private MBeanServer mBeanServer;
+
+       private static final String PREFIX = "org.apache.flink.metrics:";
+       private static final String KEY_PREFIX = "key";
+
+       public JMXReporter() {
+               this.mBeanServer = ManagementFactory.getPlatformMBeanServer();
+       }
+
+       @Override
+       public void notifyOfAddedMetric(Metric metric, String name) {
+               AbstractBean jmxMetric;
+               ObjectName jmxName;
+               try {
+                       jmxName = new ObjectName(name);
+               } catch (MalformedObjectNameException e) {
+                       throw new IllegalArgumentException("Metric name did not 
conform to JMX ObjectName rules: " + name, e);
+               }
+
+               if (metric instanceof Gauge) {
+                       jmxMetric = new JmxGauge((Gauge<?>) metric);
+               } else if (metric instanceof Counter) {
+                       jmxMetric = new JmxCounter((Counter) metric);
+               } else {
+                       throw new IllegalArgumentException("Unknown metric 
type: " + metric.getClass());
+               }
+
+               try {
+                       mBeanServer.registerMBean(jmxMetric, jmxName);
+               } catch (NotCompliantMBeanException e) { //implementation error 
on our side
+                       LOG.error("Metric did not comply with JMX MBean naming 
rules.", e);
+               } catch (InstanceAlreadyExistsException e) {
+                       LOG.error("A metric with the name " + jmxName + " was 
already registered.", e);
+               } catch (MBeanRegistrationException e) {
+                       LOG.error("Failed to register metric.", e);
+               }
+       }
+
+       @Override
+       public void notifyOfRemovedMetric(Metric metric, String name) {
+               try {
+                       mBeanServer.unregisterMBean(new ObjectName(name));
+               } catch (MBeanRegistrationException e) {
+                       LOG.error("Un-registering metric failed.", e);
+               } catch (MalformedObjectNameException e) {
+                       LOG.error("Un-registering metric failed due to invalid 
name.", e);
+               } catch (InstanceNotFoundException e) {
+                       //alright then
+               }
+       }
+
+       @Override
+       public void open(Configuration config) {
+       }
+
+       @Override
+       public void close() {
+       }
+
+       @Override
+       public String generateName(String name, List<String> origin) {
+               StringBuilder fullName = new StringBuilder();
+
+               fullName.append(PREFIX);
+               for (int x = 0; x < origin.size(); x++) {
+                       fullName.append(KEY_PREFIX);
+                       fullName.append(x);
+                       fullName.append("=");
+                       String value = origin.get(x);
+                       value = value.replaceAll("\"", "");
+                       value = value.replaceAll(" ", "_");
+                       value = value.replaceAll("[,=;:?'*]", "-");
+                       fullName.append(value);
+                       fullName.append(",");
+               }
+               fullName.append("name=" + name);
+
+               return fullName.toString();
+       }
+
+       public interface MetricMBean {
+       }
+
+       private abstract static class AbstractBean implements MetricMBean {
+       }
+
+       public interface JmxCounterMBean extends MetricMBean {
+               long getCount();
+       }
+
+       private static class JmxCounter extends AbstractBean implements 
JmxCounterMBean {
+               private Counter counter;
+
+               public JmxCounter(Counter counter) {
+                       this.counter = counter;
+               }
+
+               @Override
+               public long getCount() {
+                       return counter.getCount();
+               }
+       }
+
+       public interface JmxGaugeMBean extends MetricMBean {
+               Object getValue();
+       }
+
+       private static class JmxGauge extends AbstractBean implements 
JmxGaugeMBean {
+               private final Gauge gauge;
+
+               public JmxGauge(Gauge gauge) {
+                       this.gauge = gauge;
+               }
+
+               @Override
+               public Object getValue() {
+                       return gauge.getValue();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
new file mode 100644
index 0000000..2bca606
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.reporter;
+
+import com.codahale.metrics.Reporter;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+
+import java.util.List;
+
+/**
+ * Reporters are used to export {@link org.apache.flink.metrics.Metric}s to an 
external backend.
+ * <p>
+ * Reporters are instantiated generically and must have a no-argument 
constructor.
+ */
+@PublicEvolving
+public interface MetricReporter extends Reporter {
+       /**
+        * Configures this reporter. Since reporters are instantiated 
generically and hence parameter-less,
+        * this method is the place where the reporters set their basic fields 
based on configuration values.
+        * <p>
+        * This method is always called first on a newly instantiated reporter.
+        *
+        * @param config The configuration with all parameters.
+        */
+       void open(Configuration config);
+
+       /**
+        * Closes this reporter. Should be used to close channels, streams and 
release resources.
+        */
+       void close();
+
+       /**
+        * Called when a new {@link org.apache.flink.metrics.Metric} was added.
+        *
+        * @param metric metric that was added
+        * @param name   name of the metric
+        */
+       void notifyOfAddedMetric(Metric metric, String name);
+
+       /**
+        * Called when a {@link org.apache.flink.metrics.Metric} was removed.
+        *
+        * @param metric metric that was removed
+        * @param name   name of the metric
+        */
+       void notifyOfRemovedMetric(Metric metric, String name);
+
+       /**
+        * Generates the reported name of a metric based on it's 
hierarchy/scope and associated name.
+        *
+        * @param name  name of the metric
+        * @param scope hierarchy/scope of the metric
+        * @return reported name
+        */
+       String generateName(String name, List<String> scope);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
new file mode 100644
index 0000000..3638f7a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.reporter;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Marker interface for reporters that actively send out data periodically.
+ */
+@PublicEvolving
+public interface Scheduled {
+       /**
+        * Report the current measurements.
+        * This method is called in regular intervals
+        */
+       void report();
+}

Reply via email to