[FLINK-1502] [core] Cleanups, robustness, and performance improvements in the 
metrics system


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ad8375a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ad8375a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ad8375a

Branch: refs/heads/master
Commit: 7ad8375a89374bec80571029e9166f1336bdea8e
Parents: d3e3bd5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 25 20:34:44 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon May 30 14:54:10 2016 +0200

----------------------------------------------------------------------
 flink-contrib/flink-storm/pom.xml               |  22 +-
 .../flink/storm/wrappers/BoltWrapperTest.java   |  13 +-
 .../common/operators/CollectionExecutor.java    |  32 +-
 .../java/org/apache/flink/metrics/Counter.java  |   5 +-
 .../java/org/apache/flink/metrics/Gauge.java    |   3 +-
 .../java/org/apache/flink/metrics/Metric.java   |   3 +-
 .../apache/flink/metrics/MetricRegistry.java    |  92 ++---
 .../metrics/groups/AbstractMetricGroup.java     |  98 +++--
 .../metrics/groups/ComponentMetricGroup.java    | 109 +----
 .../metrics/groups/GenericMetricGroup.java      |  45 +--
 .../flink/metrics/groups/IOMetricGroup.java     |  27 +-
 .../flink/metrics/groups/JobMetricGroup.java    |  88 ++--
 .../groups/NonRegisteringMetricsGroup.java      |  87 ----
 .../metrics/groups/OperatorMetricGroup.java     |  35 +-
 .../org/apache/flink/metrics/groups/Scope.java  | 119 ------
 .../metrics/groups/TaskManagerMetricGroup.java  |  68 ++--
 .../flink/metrics/groups/TaskMetricGroup.java   | 126 ++++--
 .../groups/UnregisteredMetricsGroup.java        |  73 ++++
 .../flink/metrics/groups/scope/ScopeFormat.java | 399 +++++++++++++++++++
 .../metrics/groups/scope/ScopeFormats.java      | 105 +++++
 .../metrics/reporter/AbstractReporter.java      |  40 +-
 .../flink/metrics/reporter/JMXReporter.java     | 194 ++++++---
 .../flink/metrics/reporter/MetricReporter.java  |  53 +--
 .../flink/metrics/reporter/Scheduled.java       |   8 +-
 .../functions/util/RuntimeUDFContextTest.java   |  33 +-
 .../api/common/io/RichInputFormatTest.java      |  18 +-
 .../api/common/io/RichOutputFormatTest.java     |  19 +-
 .../operators/GenericDataSinkBaseTest.java      |  16 +-
 .../operators/GenericDataSourceBaseTest.java    |  18 +-
 .../base/FlatMapOperatorCollectionTest.java     |   9 +-
 .../base/InnerJoinOperatorBaseTest.java         |  15 +-
 .../common/operators/base/MapOperatorTest.java  |  36 +-
 .../base/PartitionMapOperatorTest.java          |  19 +-
 .../flink/metrics/MetricRegistryTest.java       |  78 +---
 .../flink/metrics/groups/JobGroupTest.java      |  73 ++--
 .../groups/MetricGroupRegistrationTest.java     |   7 +-
 .../flink/metrics/groups/MetricGroupTest.java   |  32 +-
 .../flink/metrics/groups/OperatorGroupTest.java |  66 +--
 .../flink/metrics/groups/TaskGroupTest.java     | 103 +++--
 .../metrics/groups/TaskManagerGroupTest.java    |  58 +--
 .../flink/metrics/reporter/JMXReporterTest.java |  35 +-
 .../flink/metrics/util/DummyJobMetricGroup.java |  47 ---
 .../flink/metrics/util/DummyMetricGroup.java    |  57 ---
 .../flink/metrics/util/DummyMetricRegistry.java |  29 --
 .../metrics/util/DummyOperatorMetricGroup.java  |  37 --
 .../flink/metrics/util/DummyReporter.java       |  47 ---
 .../util/DummyTaskManagerMetricGroup.java       |  42 --
 .../metrics/util/DummyTaskMetricGroup.java      |  42 --
 .../apache/flink/metrics/util/TestReporter.java |  11 +-
 .../base/CoGroupOperatorCollectionTest.java     |   6 +-
 .../operators/base/GroupReduceOperatorTest.java |  17 +-
 .../base/InnerJoinOperatorBaseTest.java         |  24 +-
 .../operators/base/ReduceOperatorTest.java      |  23 +-
 .../dropwizard/ScheduledDropwizardReporter.java | 104 +++--
 .../flink/dropwizard/metrics/GaugeWrapper.java  |   8 +
 .../flink/metrics/ganglia/GangliaReporter.java  |  79 ++++
 .../flink/metrics/graphite/GangliaReporter.java |  73 ----
 .../metrics/graphite/GraphiteReporter.java      |  17 +-
 .../flink/metrics/statsd/StatsDReporter.java    |  79 ++--
 .../flink/runtime/taskmanager/TaskManager.scala |   3 +-
 .../operators/drivers/TestTaskContext.java      |   4 +-
 .../testutils/BinaryOperatorTestBase.java       |   6 +-
 .../operators/testutils/DriverTestBase.java     |  31 +-
 .../operators/testutils/DummyEnvironment.java   |  28 +-
 .../operators/testutils/MockEnvironment.java    |   3 +-
 .../testutils/UnregisteredTaskMetricsGroup.java |  68 ++++
 ...AlignedProcessingTimeWindowOperatorTest.java |  20 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |  14 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   4 +-
 69 files changed, 1893 insertions(+), 1509 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml 
b/flink-contrib/flink-storm/pom.xml
index a080a03..590f33d 100644
--- a/flink-contrib/flink-storm/pom.xml
+++ b/flink-contrib/flink-storm/pom.xml
@@ -35,6 +35,9 @@ under the License.
        <packaging>jar</packaging>
 
        <dependencies>
+               
+               <!-- core dependencies -->
+               
                <dependency>
                        <!-- Together with the dependency management section in 
flink-parent, this
                        pins the Kryo version of transitive dependencies to the 
Flink Kryo version -->
@@ -49,14 +52,6 @@ under the License.
                </dependency>
 
                <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
                        <groupId>org.apache.storm</groupId>
                        <artifactId>storm-core</artifactId>
                        <version>0.9.4</version>
@@ -77,6 +72,17 @@ under the License.
                        <artifactId>guava</artifactId>
                        <version>${guava.version}</version>
                </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index cb9ac1c..5da12ef 100644
--- 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -32,10 +32,9 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.metrics.util.DummyMetricGroup;
-import org.apache.flink.metrics.util.DummyTaskMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.SplitStreamType;
 import org.apache.flink.storm.util.StormConfig;
@@ -144,7 +143,7 @@ public class BoltWrapperTest extends AbstractTest {
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
                when(taskContext.getTaskName()).thenReturn("name");
-               when(taskContext.getMetricGroup()).thenReturn(new 
DummyMetricGroup());
+               when(taskContext.getMetricGroup()).thenReturn(new 
UnregisteredMetricsGroup());
 
                final IRichBolt bolt = mock(IRichBolt.class);
 
@@ -229,7 +228,7 @@ public class BoltWrapperTest extends AbstractTest {
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
                when(taskContext.getTaskName()).thenReturn("name");
-               when(taskContext.getMetricGroup()).thenReturn(new 
DummyMetricGroup());
+               when(taskContext.getMetricGroup()).thenReturn(new 
UnregisteredMetricsGroup());
 
                final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
                declarer.declare(new Fields("dummy"));
@@ -294,7 +293,7 @@ public class BoltWrapperTest extends AbstractTest {
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
                when(taskContext.getTaskName()).thenReturn("name");
-               when(taskContext.getMetricGroup()).thenReturn(new 
DummyMetricGroup());
+               when(taskContext.getMetricGroup()).thenReturn(new 
UnregisteredMetricsGroup());
 
                final IRichBolt bolt = mock(IRichBolt.class);
                BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, 
Object>(bolt);
@@ -367,7 +366,7 @@ public class BoltWrapperTest extends AbstractTest {
                Environment env = mock(Environment.class);
                when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 0, 
1, 0));
                
when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
-               when(env.getMetricGroup()).thenReturn(new 
DummyTaskMetricGroup());
+               when(env.getMetricGroup()).thenReturn(new 
UnregisteredTaskMetricsGroup());
 
                StreamTask<?, ?> mockTask = mock(StreamTask.class);
                when(mockTask.getCheckpointLock()).thenReturn(new Object());

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index 97f0c5e..913b205 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -36,7 +36,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -62,7 +61,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.NonRegisteringMetricsGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Visitor;
 
@@ -72,8 +71,6 @@ import org.apache.flink.util.Visitor;
 @Internal
 public class CollectionExecutor {
        
-       private static final boolean DEFAULT_MUTABLE_OBJECT_SAFE_MODE = true;
-       
        private final Map<Operator<?>, List<?>> intermediateResults;
        
        private final Map<String, Accumulator<?, ?>> accumulators;
@@ -109,11 +106,6 @@ public class CollectionExecutor {
        
        public JobExecutionResult execute(Plan program) throws Exception {
                long startTime = System.currentTimeMillis();
-               
-               JobID jobID = program.getJobId();
-               if (jobID == null) {
-                       jobID = new JobID();
-               }
 
                initCache(program.getCachedFiles());
                Collection<? extends GenericDataSinkBase<?>> sinks = 
program.getDataSinks();
@@ -194,7 +186,7 @@ public class CollectionExecutor {
                TaskInfo taskInfo = new TaskInfo(typedSink.getName(), 0, 1, 0);
                RuntimeUDFContext ctx;
 
-               MetricGroup metrics = NonRegisteringMetricsGroup.get();
+               MetricGroup metrics = new UnregisteredMetricsGroup();
                        
                if 
(RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass()))
 {
                        ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
@@ -215,7 +207,7 @@ public class CollectionExecutor {
                
                RuntimeUDFContext ctx;
 
-               MetricGroup metrics = NonRegisteringMetricsGroup.get();
+               MetricGroup metrics = new UnregisteredMetricsGroup();
                if 
(RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass()))
 {
                        ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
                                        new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators, metrics);
@@ -241,7 +233,7 @@ public class CollectionExecutor {
                TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
                RuntimeUDFContext ctx;
 
-               MetricGroup metrics = NonRegisteringMetricsGroup.get();
+               MetricGroup metrics = new UnregisteredMetricsGroup();
                if 
(RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
 {
                        ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
                                        new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators, metrics);
@@ -253,10 +245,8 @@ public class CollectionExecutor {
                } else {
                        ctx = null;
                }
-               
-               List<OUT> result = typedOp.executeOnCollections(inputData, ctx, 
executionConfig);
-               
-               return result;
+
+               return typedOp.executeOnCollections(inputData, ctx, 
executionConfig);
        }
        
        private <IN1, IN2, OUT> List<OUT> 
executeBinaryOperator(DualInputOperator<?, ?, ?, ?> operator, int superStep) 
throws Exception {
@@ -283,7 +273,7 @@ public class CollectionExecutor {
                TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
                RuntimeUDFContext ctx;
 
-               MetricGroup metrics = NonRegisteringMetricsGroup.get();
+               MetricGroup metrics = new UnregisteredMetricsGroup();
        
                if 
(RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
 {
                        ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
@@ -296,10 +286,8 @@ public class CollectionExecutor {
                } else {
                        ctx = null;
                }
-               
-               List<OUT> result = typedOp.executeOnCollections(inputData1, 
inputData2, ctx, executionConfig);
-               
-               return result;
+
+               return typedOp.executeOnCollections(inputData1, inputData2, 
ctx, executionConfig);
        }
        
        @SuppressWarnings("unchecked")
@@ -448,7 +436,7 @@ public class CollectionExecutor {
                                solutionMap.put(wrapper, delta);
                        }
 
-                       currentWorkset = (List<?>) 
execute(iteration.getNextWorkset(), superstep);
+                       currentWorkset = execute(iteration.getNextWorkset(), 
superstep);
 
                        if (currentWorkset.isEmpty()) {
                                break;

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java 
b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
index 201a613..acc37cf 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics;
 
 import org.apache.flink.annotation.PublicEvolving;
@@ -24,9 +25,9 @@ import org.apache.flink.annotation.PublicEvolving;
  */
 @PublicEvolving
 public final class Counter implements Metric {
-       
+
        private long count;
-       
+
        /**
         * Increment the current count by 1.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java 
b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
index cca105e..aad8deb 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics;
 
 import org.apache.flink.annotation.PublicEvolving;
@@ -24,7 +25,7 @@ import org.apache.flink.annotation.PublicEvolving;
  */
 @PublicEvolving
 public abstract class Gauge<T> implements Metric {
-       
+
        /**
         * Calculates and returns the measured value.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java 
b/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
index 11cfcc6..8054de0 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
@@ -15,12 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics;
 
 import org.apache.flink.annotation.PublicEvolving;
 
 /**
- * Common interface for all metrics.
+ * Common super interface for all metrics.
  */
 @PublicEvolving
 public interface Metric {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java 
b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
index 7e06217..e5d3477 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -21,7 +21,8 @@ package org.apache.flink.metrics;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.metrics.groups.Scope;
+import org.apache.flink.metrics.groups.scope.ScopeFormat;
+import org.apache.flink.metrics.groups.scope.ScopeFormats;
 import org.apache.flink.metrics.reporter.JMXReporter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
@@ -32,11 +33,6 @@ import org.slf4j.LoggerFactory;
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB;
-import static 
org.apache.flink.metrics.groups.OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR;
-import static 
org.apache.flink.metrics.groups.TaskManagerMetricGroup.DEFAULT_SCOPE_TM;
-import static 
org.apache.flink.metrics.groups.TaskMetricGroup.DEFAULT_SCOPE_TASK;
-
 /**
  * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It 
serves as the
  * connection between {@link MetricGroup MetricGroups} and {@link 
MetricReporter MetricReporters}.
@@ -52,10 +48,10 @@ public class MetricRegistry {
        public static final String KEY_METRICS_REPORTER_ARGUMENTS = 
"metrics.reporter.arguments";
        public static final String KEY_METRICS_REPORTER_INTERVAL = 
"metrics.reporter.interval";
 
-       public static final String KEY_METRICS_SCOPE_NAMING_TM = 
"metrics.scope.tm";
-       public static final String KEY_METRICS_SCOPE_NAMING_JOB = 
"metrics.scope.job";
-       public static final String KEY_METRICS_SCOPE_NAMING_TASK = 
"metrics.scope.task";
-       public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = 
"metrics.scope.operator";
+       public static final String KEY_METRICS_SCOPE_NAMING_TM = 
"metrics.scopeName.tm";
+       public static final String KEY_METRICS_SCOPE_NAMING_JOB = 
"metrics.scopeName.job";
+       public static final String KEY_METRICS_SCOPE_NAMING_TASK = 
"metrics.scopeName.task";
+       public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = 
"metrics.scopeName.operator";
 
        // 
------------------------------------------------------------------------
        //  configuration keys
@@ -66,23 +62,22 @@ public class MetricRegistry {
        private final MetricReporter reporter;
        private final java.util.Timer timer;
 
-       private final Scope.ScopeFormat scopeConfig;
+       private final ScopeFormats scopeFormats;
 
        /**
         * Creates a new MetricRegistry and starts the configured reporter.
         */
        public MetricRegistry(Configuration config) {
-               // first parse the scope formats, these are needed for all 
reporters
-               
-               Scope.ScopeFormat scopeFormat;
+               // first parse the scopeName formats, these are needed for all 
reporters
+               ScopeFormats scopeFormats;
                try {
-                       scopeFormat = createScopeConfig(config);
+                       scopeFormats = createScopeConfig(config);
                }
                catch (Exception e) {
-                       scopeFormat = createScopeConfig(new Configuration());
-                       LOG.warn("Failed to parse scope format, using default 
scope formats", e);
+                       LOG.warn("Failed to parse scopeName format, using 
default scopeName formats", e);
+                       scopeFormats = new ScopeFormats();
                }
-               this.scopeConfig = scopeFormat;
+               this.scopeFormats = scopeFormats;
 
                // second, instantiate any custom configured reporters
                
@@ -157,8 +152,8 @@ public class MetricRegistry {
                }
        }
 
-       public Scope.ScopeFormat getScopeConfig() {
-               return this.scopeConfig;
+       public ScopeFormats getScopeFormats() {
+               return scopeFormats;
        }
 
        // 
------------------------------------------------------------------------
@@ -166,35 +161,36 @@ public class MetricRegistry {
        // 
------------------------------------------------------------------------
 
        /**
-        * Registers a new {@link org.apache.flink.metrics.Metric} with this 
registry.
+        * Registers a new {@link Metric} with this registry.
         *
-        * @param metric metric to register
-        * @param name   name of the metric
-        * @param parent group that contains the metric
+        * @param metric      the metric that was added
+        * @param metricName  the name of the metric
+        * @param group       the group that contains the metric
         */
-       public void register(Metric metric, String name, AbstractMetricGroup 
parent) {
-               String metricName = reporter.generateName(name, 
parent.generateScope());
-               this.reporter.notifyOfAddedMetric(metric, metricName);
+       public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
+               if (reporter != null) {
+                       reporter.notifyOfAddedMetric(metric, metricName, group);
+               }
        }
 
        /**
         * Un-registers the given {@link org.apache.flink.metrics.Metric} with 
this registry.
         *
-        * @param metric metric to un-register
-        * @param name   name of the metric
-        * @param parent group that contains the metric
+        * @param metric      the metric that should be removed
+        * @param metricName  the name of the metric
+        * @param group       the group that contains the metric
         */
-       public void unregister(Metric metric, String name, AbstractMetricGroup 
parent) {
-               String metricName = reporter.generateName(name, 
parent.generateScope());
-               
-               this.reporter.notifyOfRemovedMetric(metric, metricName);
+       public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
+               if (reporter != null) {
+                       reporter.notifyOfRemovedMetric(metric, metricName, 
group);
+               }
        }
 
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
        
-       private static Configuration createReporterConfig(Configuration config, 
TimeUnit timeunit, long period) {
+       static Configuration createReporterConfig(Configuration config, 
TimeUnit timeunit, long period) {
                Configuration reporterConfig = new Configuration();
                reporterConfig.setLong("period", period);
                reporterConfig.setString("timeunit", timeunit.name());
@@ -208,19 +204,17 @@ public class MetricRegistry {
                return reporterConfig;
        }
 
-       private static Scope.ScopeFormat createScopeConfig(Configuration 
config) {
-               String tmFormat = config.getString(KEY_METRICS_SCOPE_NAMING_TM, 
DEFAULT_SCOPE_TM);
-               String jobFormat = 
config.getString(KEY_METRICS_SCOPE_NAMING_JOB, DEFAULT_SCOPE_JOB);
-               String taskFormat = 
config.getString(KEY_METRICS_SCOPE_NAMING_TASK, DEFAULT_SCOPE_TASK);
-               String operatorFormat = 
config.getString(KEY_METRICS_SCOPE_NAMING_OPERATOR, DEFAULT_SCOPE_OPERATOR);
-
-
-               Scope.ScopeFormat format = new Scope.ScopeFormat();
-               format.setTaskManagerFormat(tmFormat);
-               format.setJobFormat(jobFormat);
-               format.setTaskFormat(taskFormat);
-               format.setOperatorFormat(operatorFormat);
-               return format;
+       static ScopeFormats createScopeConfig(Configuration config) {
+               String tmFormat = config.getString(
+                               KEY_METRICS_SCOPE_NAMING_TM, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
+               String jobFormat = config.getString(
+                               KEY_METRICS_SCOPE_NAMING_JOB, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
+               String taskFormat = config.getString(
+                               KEY_METRICS_SCOPE_NAMING_TASK, 
ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
+               String operatorFormat = config.getString(
+                               KEY_METRICS_SCOPE_NAMING_OPERATOR, 
ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
+               
+               return new ScopeFormats(tmFormat, jobFormat, taskFormat, 
operatorFormat);
        }
 
        // 
------------------------------------------------------------------------
@@ -236,7 +230,7 @@ public class MetricRegistry {
         * which acts as a fail-safe to stop the timer thread and prevents 
resource leaks.
         */
        private static final class ReporterTask extends TimerTask {
-               
+
                private final Scheduled reporter;
 
                private ReporterTask(Scheduled reporter) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
index cad241d..032fa04 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
@@ -25,14 +25,12 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
 
+import org.apache.flink.metrics.groups.scope.ScopeFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -55,31 +53,58 @@ public abstract class AbstractMetricGroup implements 
MetricGroup {
        /** shared logger */
        private static final Logger LOG = 
LoggerFactory.getLogger(MetricGroup.class);
 
-       private static final String METRIC_NAME_REGEX = "[a-zA-Z0-9]*";
-       
-       /** The pattern that metric and group names have to match */
-       private static final Pattern METRIC_NAME_PATTERN = 
Pattern.compile(METRIC_NAME_REGEX);
-
        // 
------------------------------------------------------------------------
 
        /** The registry that this metrics group belongs to */
        protected final MetricRegistry registry;
 
        /** All metrics that are directly contained in this group */
-       protected final Map<String, Metric> metrics = new HashMap<>();
+       private final Map<String, Metric> metrics = new HashMap<>();
 
        /** All metric subgroups of this group */
-       protected final Map<String, MetricGroup> groups = new HashMap<>();
+       private final Map<String, MetricGroup> groups = new HashMap<>();
+
+       /** The metrics scope represented by this group.
+        *  For example ["host-7", "taskmanager-2", "window_word_count", 
"my-mapper" ]. */
+       private final String[] scopeComponents;
+
+       /** The metrics scope represented by this group, as a concatenated 
string, lazily computed.
+        * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
+       private String scopeString;
 
        /** Flag indicating whether this group has been closed */
        private volatile boolean closed;
 
        // 
------------------------------------------------------------------------
-       
-       public AbstractMetricGroup(MetricRegistry registry) {
+
+       public AbstractMetricGroup(MetricRegistry registry, String[] scope) {
                this.registry = checkNotNull(registry);
+               this.scopeComponents = checkNotNull(scope);
        }
 
+       /**
+        * Gets the scope as an array of the scope components, for example
+        * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
+        * 
+        * @see #getScopeString() 
+        */
+       public String[] getScopeComponents() {
+               return scopeComponents;
+       }
+
+       /**
+        * Gets the scope as a single delimited string, for example
+        * {@code "host-7.taskmanager-2.window_word_count.my-mapper"}
+        *
+        * @see #getScopeComponents()
+        */
+       public String getScopeString() {
+               if (scopeString == null) {
+                       scopeString = ScopeFormat.concat(scopeComponents);
+               }
+               return scopeString;
+       }
+       
        // 
------------------------------------------------------------------------
        //  Closing
        // 
------------------------------------------------------------------------
@@ -111,25 +136,6 @@ public abstract class AbstractMetricGroup implements 
MetricGroup {
        }
 
        // 
-----------------------------------------------------------------------------------------------------------------
-       //  Scope
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       /**
-        * Generates the full scope based on the default/configured format that 
applies to all metrics within this group.
-        *
-        * @return generated scope
-        */
-       public abstract List<String> generateScope();
-
-       /**
-        * Generates the full scope based on the given format that applies to 
all metrics within this group.
-        *
-        * @param format format string
-        * @return generated scope
-        */
-       public abstract List<String> generateScope(Scope.ScopeFormat format);
-
-       // 
-----------------------------------------------------------------------------------------------------------------
        //  Metrics
        // 
-----------------------------------------------------------------------------------------------------------------
 
@@ -164,11 +170,8 @@ public abstract class AbstractMetricGroup implements 
MetricGroup {
         * @param metric the metric to register
         */
        protected void addMetric(String name, Metric metric) {
-               Matcher nameMatcher = METRIC_NAME_PATTERN.matcher(name);
-               if (!nameMatcher.matches()) {
-                       throw new IllegalArgumentException("Metric names may 
not contain special characters or spaces. " +
-                                       "Allowed is: " + METRIC_NAME_REGEX);
-               }
+               // early reject names that will later cause issues
+               checkAllowedCharacters(name);
 
                // add the metric only if the group is still open
                synchronized (this) {
@@ -185,7 +188,7 @@ public abstract class AbstractMetricGroup implements 
MetricGroup {
                                                // we warn here, rather than 
failing, because metrics are tools that should not fail the
                                                // program when used incorrectly
                                                LOG.warn("Name collision: 
Adding a metric with the same name as a metric subgroup: '" +
-                                                               name + "'. 
Metric might not get properly reported. (" + generateScope() + ')');
+                                                               name + "'. 
Metric might not get properly reported. (" + scopeString + ')');
                                        }
 
                                        registry.register(metric, name, this);
@@ -197,7 +200,7 @@ public abstract class AbstractMetricGroup implements 
MetricGroup {
                                        // we warn here, rather than failing, 
because metrics are tools that should not fail the
                                        // program when used incorrectly
                                        LOG.warn("Name collision: Group already 
contains a Metric with the name '" +
-                                                       name + "'. Metric will 
not be reported. (" + generateScope() + ')');
+                                                       name + "'. Metric will 
not be reported. (" + scopeString + ')');
                                }
                        }
                }
@@ -221,7 +224,7 @@ public abstract class AbstractMetricGroup implements 
MetricGroup {
                                // program when used incorrectly
                                if (metrics.containsKey(name)) {
                                        LOG.warn("Name collision: Adding a 
metric subgroup with the same name as an existing metric: '" +
-                                                       name + "'. Metric might 
not get properly reported. (" + generateScope() + ')');
+                                                       name + "'. Metric might 
not get properly reported. (" + scopeString + ')');
                                }
 
                                MetricGroup newGroup = new 
GenericMetricGroup(registry, this, name);
@@ -243,4 +246,21 @@ public abstract class AbstractMetricGroup implements 
MetricGroup {
                        }
                }
        }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Fast implementation to check if a string has only alphanumeric 
characters.
+        * Compared to a regular expression, this is about an order of 
magnitude faster.
+        */
+       private static void checkAllowedCharacters(String name) {
+               for (int i = 0; i < name.length(); i++) {
+                       final char c = name.charAt(i);
+                       if (c < 0x30 || (c >= 0x3a && c <= 0x40) || (c > 0x5a 
&& c <= 0x60) || c > 0x7a) {
+                               throw new IllegalArgumentException("Metric 
names may only contain [a-zA-Z0-9].");
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
index c68cdc1..518d940 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
@@ -21,46 +21,33 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.MetricRegistry;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.metrics.groups.Scope.SCOPE_WILDCARD;
-
 /**
  * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components 
(e.g., 
  * TaskManager, Job, Task, Operator).
  * 
- * <p>The components metric groups contain functionality to define alias names 
for
- * the component identifier. For example, while Tasks are registered under a 
Task Attempt ID,
- * the metrics name scope may use the task name instead. Using these aliases 
makes
- * Metric scope names stable across jobs, recovery attempts, etc.
+ * <p>Usually, the scope of metrics is simply the hierarchy of the containing 
groups. For example
+ * the Metric {@code "MyMetric"} in group {@code "B"} nested in group {@code 
"A"} would have a
+ * fully scoped name of {@code "A.B.MyMetric"}, with {@code "A.B"} being the 
Metric's scope.
+ *
+ * <p>Component groups, however, have configurable scopes. This allow users to 
include or exclude
+ * certain identifiers from the scope. The scope for metrics belonging to the 
"Task"
+ * group could for example include the task attempt number (more fine grained 
identification), or
+ * exclude it (for continuity of the namespace across failure and recovery).
  */
 @Internal
 public abstract class ComponentMetricGroup extends AbstractMetricGroup {
 
-       private final ComponentMetricGroup parent;
-
-       private final String format;
-       
-       // Map: scope variable -> specific value
-       protected final Map<String, String> formats;
-
-       // 
------------------------------------------------------------------------
-
        /**
         * Creates a new ComponentMetricGroup.
         *
-        * @param registry    registry to register new metrics with
-        * @param parentGroup parent group, may be null
-        * @param scopeFormat default format string
+        * @param registry     registry to register new metrics with
+        * @param scope        the scope of the group
         */
-       public ComponentMetricGroup(MetricRegistry registry, 
ComponentMetricGroup parentGroup, String scopeFormat) {
-               super(registry);
-               this.formats = new HashMap<>();
-               this.parent = parentGroup;
-               this.format = scopeFormat;
+       public ComponentMetricGroup(
+                       MetricRegistry registry,
+                       String[] scope) {
+
+               super(registry, scope);
        }
 
        /**
@@ -87,71 +74,5 @@ public abstract class ComponentMetricGroup extends 
AbstractMetricGroup {
        //  sub components
        // 
------------------------------------------------------------------------
 
-       protected ComponentMetricGroup parent() {
-               return parent;
-       }
-
        protected abstract Iterable<? extends ComponentMetricGroup> 
subComponents();
-
-       // 
------------------------------------------------------------------------
-       //  scope format
-       // 
------------------------------------------------------------------------
-
-       protected abstract String getScopeFormat(Scope.ScopeFormat format);
-
-       @Override
-       public List<String> generateScope() {
-               return generateScope(format);
-       }
-
-       @Override
-       public List<String> generateScope(Scope.ScopeFormat format) {
-               return generateScope(getScopeFormat(format));
-       }
-
-       private List<String> generateScope(String format) {
-               String[] components = Scope.split(format);
-
-               List<String> scope = new ArrayList<>();
-               if (components[0].equals(SCOPE_WILDCARD)) {
-                       if (this.parent != null) {
-                               scope = this.parent.generateScope();
-                       }
-                       this.replaceFormats(components);
-                       addToList(scope, components, 1);
-               } else {
-                       if (this.parent != null) {
-                               this.parent.replaceFormats(components);
-                       }
-                       this.replaceFormats(components);
-                       addToList(scope, components, 0);
-               }
-               return scope;
-       }
-
-       private void replaceFormats(String[] components) {
-               if (this.parent != null) {
-                       this.parent.replaceFormats(components);
-               }
-               for (int x = 0; x < components.length; x++) {
-                       if (components[x].startsWith("<")) {
-                               if (this.formats.containsKey(components[x])) {
-                                       components[x] = 
this.formats.get(components[x]);
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Adds all elements from the given array, starting from the given 
index, to the given list.
-        *
-        * @param list       destination
-        * @param array      source
-        * @param startIndex array index to start from
-        */
-       private static void addToList(List<String> list, String[] array, int 
startIndex) {
-               for (int x = startIndex; x < array.length; x++) {
-                       list.add(array[x]);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
index eedb0fa..ddcd73b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
@@ -15,44 +15,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.MetricRegistry;
 
-import java.util.List;
-
 /**
- * A simple named {@link org.apache.flink.metrics.MetricGroup} with no special 
properties.
+ * A simple named {@link org.apache.flink.metrics.MetricGroup} that is used to 
hold
+ * subgroups of metrics.
  */
 @Internal
 public class GenericMetricGroup extends AbstractMetricGroup {
-       
-       private final AbstractMetricGroup parent;
-
-       private final String name;
-
-       protected GenericMetricGroup(MetricRegistry registry, 
AbstractMetricGroup parent, int name) {
-               this(registry, parent, String.valueOf(name));
-       }
-
-       protected GenericMetricGroup(MetricRegistry registry, 
AbstractMetricGroup parent, String name) {
-               super(registry);
-               this.parent = parent;
-               this.name = name;
-       }
 
-       @Override
-       public List<String> generateScope() {
-               List<String> scope = parent.generateScope();
-               scope.add(name);
-               return scope;
+       public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup 
parent, String name) {
+               super(registry, makeScopeComponents(parent, name));
        }
 
-       @Override
-       public List<String> generateScope(Scope.ScopeFormat format) {
-               List<String> scope = parent.generateScope(format);
-               scope.add(name);
-               return scope;
+       // 
------------------------------------------------------------------------
+
+       private static String[] makeScopeComponents(AbstractMetricGroup parent, 
String name) {
+               if (parent != null) {
+                       String[] parentComponents = parent.getScopeComponents();
+                       if (parentComponents != null && parentComponents.length 
> 0) {
+                               String[] parts = new 
String[parentComponents.length + 1];
+                               System.arraycopy(parentComponents, 0, parts, 0, 
parentComponents.length);
+                               parts[parts.length - 1] = name;
+                               return parts;
+                       }
+               }
+               return new String[] { name };
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
index dea5650..b34c844 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
@@ -15,19 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricRegistry;
 
-import java.util.List;
-
 /**
  * Special {@link org.apache.flink.metrics.MetricGroup} that contains 
shareable pre-defined IO-related metrics.
  */
 public class IOMetricGroup extends AbstractMetricGroup {
-       
-       private final TaskMetricGroup parent;
 
        private final Counter numBytesIn;
        private final Counter numBytesOut;
@@ -35,37 +32,27 @@ public class IOMetricGroup extends AbstractMetricGroup {
        private final Counter numRecordsOut;
 
        public IOMetricGroup(MetricRegistry registry, TaskMetricGroup parent) {
-               super(registry);
-               this.parent = parent;
+               super(registry, parent.getScopeComponents());
+
                this.numBytesIn = parent.counter("numBytesIn");
                this.numBytesOut = parent.counter("numBytesOut");
                this.numRecordsIn = parent.counter("numRecordsIn");
                this.numRecordsOut = parent.counter("numRecordsOut");
        }
 
-       @Override
-       public List<String> generateScope() {
-               return parent.generateScope();
-       }
-
-       @Override
-       public List<String> generateScope(Scope.ScopeFormat format) {
-               return parent.generateScope(format);
-       }
-
        public Counter getBytesInCounter() {
-               return this.numBytesIn;
+               return numBytesIn;
        }
 
        public Counter getBytesOutCounter() {
-               return this.numBytesOut;
+               return numBytesOut;
        }
 
        public Counter getRecordsInCounter() {
-               return this.numRecordsIn;
+               return numRecordsIn;
        }
 
        public Counter getRecordsOutCounter() {
-               return this.numRecordsOut;
+               return numRecordsOut;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
index f4f634a..f816278 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
@@ -21,57 +21,92 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.MetricRegistry;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
 import org.apache.flink.util.AbstractID;
 
+import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.flink.metrics.groups.TaskManagerMetricGroup.DEFAULT_SCOPE_TM;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing a Job.
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing 
everything belonging to
+ * a specific job, running on the TaskManager.
  * 
- * <p>Contains extra logic for adding tasks.
+ * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}).
  */
 @Internal
 public class JobMetricGroup extends ComponentMetricGroup {
 
-       public static final String SCOPE_JOB_DESCRIPTOR = "job";
-       public static final String SCOPE_JOB_ID = Scope.format("job_id");
-       public static final String SCOPE_JOB_NAME = Scope.format("job_name");
-       public static final String DEFAULT_SCOPE_JOB_COMPONENT = 
Scope.concat(SCOPE_JOB_NAME);
-       public static final String DEFAULT_SCOPE_JOB = 
Scope.concat(DEFAULT_SCOPE_TM, DEFAULT_SCOPE_JOB_COMPONENT);
-
-       // 
------------------------------------------------------------------------
+       /** The metrics group that contains this group */
+       private final TaskManagerMetricGroup parent;
 
        /** Map from execution attempt ID (task identifier) to task metrics */
        private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
-       
+
+       /** The ID of the job represented by this metrics group */
        private final JobID jobId;
 
+       /** The name of the job represented by this metrics group */
+       @Nullable
+       private final String jobName;
+
        // 
------------------------------------------------------------------------
 
-       public JobMetricGroup(MetricRegistry registry, TaskManagerMetricGroup 
taskManager, JobID jobId, String jobName) {
-               super(registry, taskManager, 
registry.getScopeConfig().getJobFormat());
+       public JobMetricGroup(
+                       MetricRegistry registry,
+                       TaskManagerMetricGroup parent,
+                       JobID jobId,
+                       @Nullable String jobName) {
                
+               this(registry, checkNotNull(parent), 
registry.getScopeFormats().getJobFormat(), jobId, jobName);
+       }
+
+       public JobMetricGroup(
+                       MetricRegistry registry,
+                       TaskManagerMetricGroup parent,
+                       TaskManagerJobScopeFormat scopeFormat, 
+                       JobID jobId,
+                       @Nullable String jobName) {
+
+               super(registry, scopeFormat.formatScope(parent, jobId, 
jobName));
+
+               this.parent = checkNotNull(parent);
                this.jobId = checkNotNull(jobId);
-               this.formats.put(SCOPE_JOB_ID, jobId.toString());
-               this.formats.put(SCOPE_JOB_NAME, checkNotNull(jobName));
+               this.jobName = jobName;
+       }
+
+       public final TaskManagerMetricGroup parent() {
+               return parent;
+       }
+
+       public JobID jobId() {
+               return jobId;
+       }
+
+       @Nullable
+       public String jobName() {
+               return jobName;
        }
 
        // 
------------------------------------------------------------------------
        //  adding / removing tasks
        // 
------------------------------------------------------------------------
 
-       public TaskMetricGroup addTask(AbstractID vertexId, AbstractID 
executionId, int subtaskIndex, String name) {
-               checkNotNull(vertexId);
+       public TaskMetricGroup addTask(
+                       AbstractID vertexId,
+                       AbstractID executionId,
+                       String taskName,
+                       int subtaskIndex,
+                       int attemptNumber) {
+               
                checkNotNull(executionId);
-               checkNotNull(name);
 
                synchronized (this) {
                        if (!isClosed()) {
-                               TaskMetricGroup task = new 
TaskMetricGroup(registry, this, vertexId, executionId, subtaskIndex, name);
+                               TaskMetricGroup task = new 
TaskMetricGroup(registry, this, 
+                                               vertexId, executionId, 
taskName, subtaskIndex, attemptNumber);
                                tasks.put(executionId, task);
                                return task;
                        } else {
@@ -82,7 +117,7 @@ public class JobMetricGroup extends ComponentMetricGroup {
 
        public void removeTaskMetricGroup(AbstractID executionId) {
                checkNotNull(executionId);
-               
+
                boolean removeFromParent = false;
                synchronized (this) {
                        if (!isClosed() && tasks.remove(executionId) != null && 
tasks.isEmpty()) {
@@ -91,25 +126,16 @@ public class JobMetricGroup extends ComponentMetricGroup {
                                close();
                        }
                }
-               
+
                // IMPORTANT: removing from the parent must happen while 
holding the this group's lock,
                //      because it would violate the "first parent then 
subgroup" lock acquisition order
                if (removeFromParent) {
-                       ((TaskManagerMetricGroup) 
parent()).removeJobMetricsGroup(jobId, this);
+                       parent.removeJobMetricsGroup(jobId, this);
                }
        }
 
-       // 
------------------------------------------------------------------------
-       //  component group behavior
-       // 
------------------------------------------------------------------------
-       
        @Override
        protected Iterable<? extends ComponentMetricGroup> subComponents() {
                return tasks.values();
        }
-
-       @Override
-       protected String getScopeFormat(Scope.ScopeFormat format) {
-               return format.getJobFormat();
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/NonRegisteringMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/NonRegisteringMetricsGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/NonRegisteringMetricsGroup.java
deleted file mode 100644
index 1bfcfe3..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/NonRegisteringMetricsGroup.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-
-/**
- * Metrics group that does not register any metrics.
- */
-@Internal
-public class NonRegisteringMetricsGroup implements MetricGroup {
-
-       // 
------------------------------------------------------------------------
-       //  singleton
-       // 
------------------------------------------------------------------------
-
-       private static final NonRegisteringMetricsGroup INSTANCE = new 
NonRegisteringMetricsGroup();
-
-       public static NonRegisteringMetricsGroup get() {
-               return INSTANCE;
-       }
-
-       /** Private constructor to prevent instantiation */
-       private NonRegisteringMetricsGroup() {}
-
-       // 
------------------------------------------------------------------------
-       //  metrics group
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void close() {}
-
-       @Override
-       public boolean isClosed() {
-               return false;
-       }
-
-       @Override
-       public Counter counter(int name) {
-               return new Counter();
-       }
-
-       @Override
-       public Counter counter(String name) {
-               return new Counter();
-       }
-
-       @Override
-       public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
-               return gauge;
-       }
-
-       @Override
-       public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
-               return gauge;
-       }
-
-       
-       @Override
-       public MetricGroup addGroup(int name) {
-               return addGroup(String.valueOf(name));
-       }
-
-       @Override
-       public MetricGroup addGroup(String name) {
-               return new NonRegisteringMetricsGroup();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
index 390b55b..6db79ab 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
@@ -15,14 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat;
 
 import java.util.Collections;
 
-import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Special {@link org.apache.flink.metrics.MetricGroup} representing an 
Operator.
@@ -30,26 +32,29 @@ import static 
org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB;
 @Internal
 public class OperatorMetricGroup extends ComponentMetricGroup {
 
-       public static final String SCOPE_OPERATOR_DESCRIPTOR = "operator";
-       public static final String SCOPE_OPERATOR_NAME = 
Scope.format("operator_name");
-       public static final String SCOPE_OPERATOR_SUBTASK_INDEX = 
Scope.format("subtask_index");
-       public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT = 
Scope.concat(SCOPE_OPERATOR_NAME, SCOPE_OPERATOR_SUBTASK_INDEX);
-       public static final String DEFAULT_SCOPE_OPERATOR = 
Scope.concat(DEFAULT_SCOPE_JOB, DEFAULT_SCOPE_OPERATOR_COMPONENT);
-
-       protected OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup 
task, String name, int subTaskIndex) {
-               super(registry, task, 
registry.getScopeConfig().getOperatorFormat());
+       /** The task metric group that contains this operator metric groups */
+       private final TaskMetricGroup parent;
 
-               this.formats.put(SCOPE_OPERATOR_NAME, name);
-               this.formats.put(SCOPE_OPERATOR_SUBTASK_INDEX, 
String.valueOf(subTaskIndex));
+       public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup 
parent, String operatorName) {
+               this(registry, parent, 
registry.getScopeFormats().getOperatorFormat(), operatorName);
        }
 
-       // 
------------------------------------------------------------------------
+       public OperatorMetricGroup(
+                       MetricRegistry registry,
+                       TaskMetricGroup parent,
+                       OperatorScopeFormat scopeFormat,
+                       String operatorName) {
 
-       @Override
-       protected String getScopeFormat(Scope.ScopeFormat format) {
-               return format.getOperatorFormat();
+               super(registry, scopeFormat.formatScope(parent, operatorName));
+               this.parent = checkNotNull(parent);
        }
 
+       // 
------------------------------------------------------------------------
+       
+       public final TaskMetricGroup parent() {
+               return parent;
+       }
+       
        @Override
        protected Iterable<? extends ComponentMetricGroup> subComponents() {
                return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java
deleted file mode 100644
index 83013e2..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * This class provides utility-functions for handling scopes.
- */
-@Internal
-public class Scope {
-       public static final String SCOPE_WILDCARD = "*";
-
-       private static final String SCOPE_PREFIX = "<";
-       private static final String SCOPE_SUFFIX = ">";
-       private static final String SCOPE_SPLIT = ".";
-
-       private Scope() {
-       }
-
-       /**
-        * Modifies the given string to resemble a scope variable.
-        *
-        * @param scope string to format
-        * @return formatted string
-        */
-       public static String format(String scope) {
-               return SCOPE_PREFIX + scope + SCOPE_SUFFIX;
-       }
-
-       /**
-        * Joins the given components into a single scope.
-        *
-        * @param components components to join
-        * @return joined scoped
-        */
-       public static String concat(String... components) {
-               StringBuilder sb = new StringBuilder();
-               sb.append(components[0]);
-               for (int x = 1; x < components.length; x++) {
-                       sb.append(SCOPE_SPLIT);
-                       sb.append(components[x]);
-               }
-               return sb.toString();
-       }
-
-       /**
-        * Splits the given scope into it's individual components.
-        *
-        * @param scope scope to split
-        * @return array of components
-        */
-       public static String[] split(String scope) {
-               return scope.split("\\" + SCOPE_SPLIT);
-       }
-
-       /**
-        * Simple container for component scope format strings.
-        */
-       public static class ScopeFormat {
-               
-               private String operatorFormat = 
OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR;
-               private String taskFormat = TaskMetricGroup.DEFAULT_SCOPE_TASK;
-               private String jobFormat = JobMetricGroup.DEFAULT_SCOPE_JOB;
-               private String taskManagerFormat = 
TaskManagerMetricGroup.DEFAULT_SCOPE_TM;
-               
-
-               public ScopeFormat setOperatorFormat(String format) {
-                       this.operatorFormat = format;
-                       return this;
-               }
-
-               public ScopeFormat setTaskFormat(String format) {
-                       this.taskFormat = format;
-                       return this;
-               }
-
-               public ScopeFormat setJobFormat(String format) {
-                       this.jobFormat = format;
-                       return this;
-               }
-
-               public ScopeFormat setTaskManagerFormat(String format) {
-                       this.taskManagerFormat = format;
-                       return this;
-               }
-
-               public String getOperatorFormat() {
-                       return this.operatorFormat;
-               }
-
-               public String getTaskFormat() {
-                       return this.taskFormat;
-               }
-
-               public String getJobFormat() {
-                       return this.jobFormat;
-               }
-
-               public String getTaskManagerFormat() {
-                       return this.taskManagerFormat;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
index bfb9362..3cb3936 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
@@ -21,13 +21,12 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.MetricRegistry;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
 import org.apache.flink.util.AbstractID;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Special {@link org.apache.flink.metrics.MetricGroup} representing a 
TaskManager.
  *
@@ -37,22 +36,33 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class TaskManagerMetricGroup extends ComponentMetricGroup {
 
-       public static final String SCOPE_HOST_DESCRIPTOR = "host";
-       public static final String SCOPE_TM_DESCRIPTOR = "taskmanager";
-       public static final String SCOPE_TM_HOST = Scope.format("host");
-       public static final String SCOPE_TM_ID = Scope.format("tm_id");
-       public static final String DEFAULT_SCOPE_TM_COMPONENT = 
Scope.concat(SCOPE_TM_HOST, "taskmanager", SCOPE_TM_ID);
-       public static final String DEFAULT_SCOPE_TM = 
DEFAULT_SCOPE_TM_COMPONENT;
-
-       // 
------------------------------------------------------------------------
-       
        private final Map<JobID, JobMetricGroup> jobs = new HashMap<>();
 
-       public TaskManagerMetricGroup(MetricRegistry registry, String host, 
String taskManagerId) {
-               super(registry, null, 
registry.getScopeConfig().getTaskManagerFormat());
+       private final String hostname;
+
+       private final String taskManagerId;
 
-               this.formats.put(SCOPE_TM_HOST, checkNotNull(host));
-               this.formats.put(SCOPE_TM_ID, checkNotNull(taskManagerId));
+
+       public TaskManagerMetricGroup(MetricRegistry registry, String hostname, 
String taskManagerId) {
+               this(registry, 
registry.getScopeFormats().getTaskManagerFormat(), hostname, taskManagerId);
+       }
+
+       public TaskManagerMetricGroup(
+                       MetricRegistry registry,
+                       TaskManagerScopeFormat scopeFormat,
+                       String hostname, String taskManagerId) {
+
+               super(registry, scopeFormat.formatScope(hostname, 
taskManagerId));
+               this.hostname = hostname;
+               this.taskManagerId = taskManagerId;
+       }
+
+       public String hostname() {
+               return hostname;
+       }
+
+       public String taskManagerId() {
+               return taskManagerId;
        }
 
        // 
------------------------------------------------------------------------
@@ -64,9 +74,10 @@ public class TaskManagerMetricGroup extends 
ComponentMetricGroup {
                        String jobName,
                        AbstractID vertexID,
                        AbstractID executionId,
+                       String taskName,
                        int subtaskIndex,
-                       String taskName) {
-               
+                       int attemptNumber) {
+
                // we cannot strictly lock both our map modification and the 
job group modification
                // because it might lead to a deadlock
                while (true) {
@@ -80,28 +91,30 @@ public class TaskManagerMetricGroup extends 
ComponentMetricGroup {
                                        jobs.put(jobId, currentJobGroup);
                                }
                        }
-               
+
                        // try to add another task. this may fail if we found a 
pre-existing job metrics
                        // group and it is closed concurrently
-                       TaskMetricGroup taskGroup = 
currentJobGroup.addTask(vertexID, executionId, subtaskIndex, taskName);
+                       TaskMetricGroup taskGroup = currentJobGroup.addTask(
+                                       vertexID, executionId, taskName, 
subtaskIndex, attemptNumber);
+
                        if (taskGroup != null) {
                                // successfully added the next task
                                return taskGroup;
                        }
-                       
+
                        // else fall through the loop
                }
        }
-       
+
        public void removeJobMetricsGroup(JobID jobId, JobMetricGroup group) {
                if (jobId == null || group == null || !group.isClosed()) {
                        return;
                }
-               
+
                synchronized (this) {
                        // optimistically remove the currently contained group, 
and check later if it was correct
                        JobMetricGroup containedGroup = jobs.remove(jobId);
-                       
+
                        // check if another group was actually contained, and 
restore that one
                        if (containedGroup != null && containedGroup != group) {
                                jobs.put(jobId, containedGroup);
@@ -113,15 +126,6 @@ public class TaskManagerMetricGroup extends 
ComponentMetricGroup {
                return jobs.size();
        }
 
-       // 
------------------------------------------------------------------------
-       //  component group behavior
-       // 
------------------------------------------------------------------------
-
-       @Override
-       protected String getScopeFormat(Scope.ScopeFormat format) {
-               return format.getTaskManagerFormat();
-       }
-
        @Override
        protected Iterable<? extends ComponentMetricGroup> subComponents() {
                return jobs.values();

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
index 316c84f..784578b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
@@ -20,12 +20,13 @@ package org.apache.flink.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
 import org.apache.flink.util.AbstractID;
 
+import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -35,47 +36,109 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 @Internal
 public class TaskMetricGroup extends ComponentMetricGroup {
-       
-       public static final String SCOPE_TASK_DESCRIPTOR = "task";
-       public static final String SCOPE_TASK_ID = Scope.format("task_id");
-       public static final String SCOPE_TASK_NAME = Scope.format("task_name");
-       public static final String SCOPE_TASK_ATTEMPT = 
Scope.format("task_attempt");
-       public static final String SCOPE_TASK_SUBTASK_INDEX = 
Scope.format("subtask_index");
-       public static final String DEFAULT_SCOPE_TASK_COMPONENT = 
SCOPE_TASK_NAME;
-       public static final String DEFAULT_SCOPE_TASK = 
Scope.concat(DEFAULT_SCOPE_JOB, DEFAULT_SCOPE_TASK_COMPONENT);
 
+       /** The job metrics group containing this task metrics group */
+       private final JobMetricGroup parent;
 
        private final Map<String, OperatorMetricGroup> operators = new 
HashMap<>();
 
        private final IOMetricGroup ioMetrics;
-
+       
+       /** The execution Id uniquely identifying the executed task represented 
by this metrics group */
        private final AbstractID executionId;
+
+       @Nullable
+       private final AbstractID vertexId;
        
+       @Nullable
+       private final String taskName;
+
        private final int subtaskIndex;
 
-       protected TaskMetricGroup(
+       private final int attemptNumber;
+
+       // 
------------------------------------------------------------------------
+
+       public TaskMetricGroup(
                        MetricRegistry registry,
                        JobMetricGroup parent,
-                       AbstractID taskId,
+                       @Nullable AbstractID vertexId,
                        AbstractID executionId,
+                       @Nullable String taskName,
                        int subtaskIndex,
-                       String name) {
+                       int attemptNumber) {
+               
+               this(registry, parent, 
registry.getScopeFormats().getTaskFormat(),
+                               vertexId, executionId, taskName, subtaskIndex, 
attemptNumber);
+       }
 
-               super(registry, parent, 
registry.getScopeConfig().getTaskFormat());
+       public TaskMetricGroup(
+                       MetricRegistry registry,
+                       JobMetricGroup parent,
+                       TaskScopeFormat scopeFormat, 
+                       @Nullable AbstractID vertexId,
+                       AbstractID executionId,
+                       @Nullable String taskName,
+                       int subtaskIndex,
+                       int attemptNumber) {
 
-               this.executionId = executionId;
+               super(registry, scopeFormat.formatScope(
+                               parent, vertexId, executionId, taskName, 
subtaskIndex, attemptNumber));
+
+               this.parent = checkNotNull(parent);
+               this.executionId = checkNotNull(executionId);
+               this.vertexId = vertexId;
+               this.taskName = taskName;
                this.subtaskIndex = subtaskIndex;
-               
-               this.formats.put(SCOPE_TASK_ID, taskId.toString());
-               this.formats.put(SCOPE_TASK_ATTEMPT, executionId.toString());
-               this.formats.put(SCOPE_TASK_NAME, checkNotNull(name));
-               this.formats.put(SCOPE_TASK_SUBTASK_INDEX, 
String.valueOf(subtaskIndex));
+               this.attemptNumber = attemptNumber;
 
                this.ioMetrics = new IOMetricGroup(registry, this);
        }
 
+       // 
------------------------------------------------------------------------
+       //  properties
+       // 
------------------------------------------------------------------------
+
+       public final JobMetricGroup parent() {
+               return parent;
+       }
+
+       public AbstractID executionId() {
+               return executionId;
+       }
+
+       @Nullable
+       public AbstractID vertexId() {
+               return vertexId;
+       }
+
+       @Nullable
+       public String taskName() {
+               return taskName;
+       }
+
+       public int subtaskIndex() {
+               return subtaskIndex;
+       }
+
+       public int attemptNumber() {
+               return attemptNumber;
+       }
+
+       /**
+        * Returns the IOMetricGroup for this task.
+        *
+        * @return IOMetricGroup for this task.
+        */
+       public IOMetricGroup getIOMetricGroup() {
+               return ioMetrics;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  operators and cleanup
+       // 
------------------------------------------------------------------------
        public OperatorMetricGroup addOperator(String name) {
-               OperatorMetricGroup operator = new 
OperatorMetricGroup(this.registry, this, name, this.subtaskIndex);
+               OperatorMetricGroup operator = new 
OperatorMetricGroup(this.registry, this, name);
 
                synchronized (this) {
                        OperatorMetricGroup previous = operators.put(name, 
operator);
@@ -93,27 +156,10 @@ public class TaskMetricGroup extends ComponentMetricGroup {
        @Override
        public void close() {
                super.close();
-               parent().removeTaskMetricGroup(executionId);
-       }
-
-       /**
-        * Returns the IOMetricGroup for this task.
-        *
-        * @return IOMetricGroup for this task.
-        */
-       public IOMetricGroup getIOMetricGroup() {
-               return this.ioMetrics;
-       }
-
-       @Override
-       protected JobMetricGroup parent() {
-               return (JobMetricGroup) super.parent();
+               parent.removeTaskMetricGroup(executionId);
        }
 
-       @Override
-       protected String getScopeFormat(Scope.ScopeFormat format) {
-               return format.getTaskFormat();
-       }
+       // 
------------------------------------------------------------------------
 
        @Override
        protected Iterable<? extends ComponentMetricGroup> subComponents() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
new file mode 100644
index 0000000..961bcce
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * A special {@link MetricGroup} that does not register any metrics at the 
metrics registry
+ * and any reporters.
+ * 
+ * <p>This metrics group appears always closed ({@link #isClosed()}).
+ */
+@Internal
+public class UnregisteredMetricsGroup implements MetricGroup {
+
+       @Override
+       public void close() {}
+
+       @Override
+       public boolean isClosed() {
+               return true;
+       }
+
+       @Override
+       public Counter counter(int name) {
+               return new Counter();
+       }
+
+       @Override
+       public Counter counter(String name) {
+               return new Counter();
+       }
+
+       @Override
+       public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
+               return gauge;
+       }
+
+       @Override
+       public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+               return gauge;
+       }
+
+       
+       @Override
+       public MetricGroup addGroup(int name) {
+               return addGroup(String.valueOf(name));
+       }
+
+       @Override
+       public MetricGroup addGroup(String name) {
+               return new UnregisteredMetricsGroup();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
new file mode 100644
index 0000000..9637f65
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups.scope;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.groups.JobMetricGroup;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.util.AbstractID;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class represents the format after which the "scope" (or namespace) of 
the various
+ * component metric groups is built. Component metric groups
+ * ({@link org.apache.flink.metrics.groups.ComponentMetricGroup}), are for 
example
+ * "TaskManager", "Task", or "Operator".
+ *
+ * <p>User defined scope formats allow users to include or exclude
+ * certain identifiers from the scope. The scope for metrics belonging to the 
"Task"
+ * group could for example include the task attempt number (more fine grained 
identification), or
+ * exclude it (continuity of the namespace across failure and recovery).
+ */
+public abstract class ScopeFormat {
+
+       // 
------------------------------------------------------------------------
+       //  Scope Format Special Characters
+       // 
------------------------------------------------------------------------
+
+       /**
+        * If the scope format starts with this character, then the parent 
components scope
+        * format will be used as a prefix.
+        * 
+        * <p>For example, if the {@link JobMetricGroup} format is {@code 
"*.<job_name>"}, and the
+        * {@link TaskManagerMetricGroup} format is {@code "<host>"}, then the 
job's metrics
+        * will have {@code "<host>.<job_name>"} as their scope.
+        */
+       public static final String SCOPE_INHERIT_PARENT = "*";
+
+       public static final String SCOPE_SEPARATOR = ".";
+
+       private static final String SCOPE_VARIABLE_PREFIX = "<";
+       private static final String SCOPE_VARIABLE_SUFFIX = ">";
+
+       // 
------------------------------------------------------------------------
+       //  Scope Variables
+       // 
------------------------------------------------------------------------
+
+       // ----- Task Manager ----
+
+       public static final String SCOPE_TASKMANAGER_HOST = asVariable("host");
+       public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id");
+
+       /** The default scope format of the TaskManager component: {@code 
"<host>.taskmanager.<tm_id>"} */
+       public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT =
+                       concat(SCOPE_TASKMANAGER_HOST, "taskmanager", 
SCOPE_TASKMANAGER_ID);
+
+       /** The default scope format of TaskManager metrics: {@code 
"<host>.taskmanager.<tm_id>"} */
+       public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = 
DEFAULT_SCOPE_TASKMANAGER_COMPONENT;
+
+       // ----- Job on Task Manager ----
+
+       public static final String SCOPE_JOB_ID = asVariable("job_id");
+       public static final String SCOPE_JOB_NAME = asVariable("job_name");
+
+       /** The default scope format for the job component: {@code 
"<job_name>"} */
+       public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT = 
SCOPE_JOB_NAME;
+
+       /** The default scope format for all job metrics: {@code 
"<host>.taskmanager.<tm_id>.<job_name>"} */
+       public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP =
+                       concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, 
DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT);
+
+       // ----- Task ----
+
+       public static final String SCOPE_TASK_VERTEX_ID = asVariable("task_id");
+       public static final String SCOPE_TASK_NAME = asVariable("task_name");
+       public static final String SCOPE_TASK_ATTEMPT_ID = 
asVariable("task_attempt_id");
+       public static final String SCOPE_TASK_ATTEMPT_NUM = 
asVariable("task_attempt_num");
+       public static final String SCOPE_TASK_SUBTASK_INDEX = 
asVariable("subtask_index");
+
+       /** Default scope of the task component: {@code 
"<task_name>.<subtask_index>"} */
+       public static final String DEFAULT_SCOPE_TASK_COMPONENT =
+                       concat(SCOPE_TASK_NAME, SCOPE_TASK_SUBTASK_INDEX);
+
+       /** The default scope format for all task metrics:
+        * {@code 
"<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>"} */
+       public static final String DEFAULT_SCOPE_TASK_GROUP =
+                       concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, 
DEFAULT_SCOPE_TASK_COMPONENT);
+
+       // ----- Operator ----
+
+       public static final String SCOPE_OPERATOR_NAME = 
asVariable("operator_name");
+
+       /** The default scope added by the operator component: 
"<operator_name>.<subtask_index>" */
+       public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT =
+                       concat(SCOPE_OPERATOR_NAME, SCOPE_TASK_SUBTASK_INDEX);
+
+       /** The default scope format for all operator metrics:
+        * {@code 
"<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"} */
+       public static final String DEFAULT_SCOPE_OPERATOR_GROUP =
+                       concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, 
DEFAULT_SCOPE_OPERATOR_COMPONENT);
+
+       // 
------------------------------------------------------------------------
+       //  Formatters form the individual component types
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The scope format for the {@link TaskManagerMetricGroup}.
+        */
+       public static class TaskManagerScopeFormat extends ScopeFormat {
+
+               public TaskManagerScopeFormat(String format) {
+                       super(format, null, new String[] {
+                                       SCOPE_TASKMANAGER_HOST,
+                                       SCOPE_TASKMANAGER_ID
+                       });
+               }
+
+               public String[] formatScope(String hostname, String 
taskManagerId) {
+                       final String[] template = copyTemplate();
+                       final String[] values = { hostname, taskManagerId };
+                       return bindVariables(template, values);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The scope format for the {@link JobMetricGroup}.
+        */
+       public static class TaskManagerJobScopeFormat extends ScopeFormat {
+
+               public TaskManagerJobScopeFormat(String format, 
TaskManagerScopeFormat parentFormat) {
+                       super(format, parentFormat, new String[] {
+                                       SCOPE_TASKMANAGER_HOST,
+                                       SCOPE_TASKMANAGER_ID,
+                                       SCOPE_JOB_ID,
+                                       SCOPE_JOB_NAME
+                       });
+               }
+
+               public String[] formatScope(TaskManagerMetricGroup parent, 
JobID jid, String jobName) {
+                       final String[] template = copyTemplate();
+                       final String[] values = {
+                                       parent.hostname(),
+                                       parent.taskManagerId(),
+                                       valueOrNull(jid),
+                                       valueOrNull(jobName)
+                       };
+                       return bindVariables(template, values);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The scope format for the {@link TaskMetricGroup}.
+        */
+       public static class TaskScopeFormat extends ScopeFormat {
+
+               public TaskScopeFormat(String format, TaskManagerJobScopeFormat 
parentFormat) {
+                       super(format, parentFormat, new String[] {
+                                       SCOPE_TASKMANAGER_HOST,
+                                       SCOPE_TASKMANAGER_ID,
+                                       SCOPE_JOB_ID,
+                                       SCOPE_JOB_NAME,
+                                       SCOPE_TASK_VERTEX_ID,
+                                       SCOPE_TASK_ATTEMPT_ID,
+                                       SCOPE_TASK_NAME,
+                                       SCOPE_TASK_SUBTASK_INDEX,
+                                       SCOPE_TASK_ATTEMPT_NUM
+                       });
+               }
+
+               public String[] formatScope(
+                               JobMetricGroup parent,
+                               AbstractID vertexId, AbstractID attemptId,
+                               String taskName, int subtask, int 
attemptNumber) {
+
+                       final String[] template = copyTemplate();
+                       final String[] values = {
+                                       parent.parent().hostname(),
+                                       parent.parent().taskManagerId(),
+                                       valueOrNull(parent.jobId()),
+                                       valueOrNull(parent.jobName()),
+                                       valueOrNull(vertexId),
+                                       valueOrNull(attemptId),
+                                       valueOrNull(taskName),
+                                       String.valueOf(subtask),
+                                       String.valueOf(attemptNumber)
+                       };
+                       return bindVariables(template, values);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The scope format for the {@link 
org.apache.flink.metrics.groups.OperatorMetricGroup}.
+        */
+       public static class OperatorScopeFormat extends ScopeFormat {
+
+               public OperatorScopeFormat(String format, TaskScopeFormat 
parentFormat) {
+                       super(format, parentFormat, new String[] {
+                                       SCOPE_TASKMANAGER_HOST,
+                                       SCOPE_TASKMANAGER_ID,
+                                       SCOPE_JOB_ID,
+                                       SCOPE_JOB_NAME,
+                                       SCOPE_TASK_VERTEX_ID,
+                                       SCOPE_TASK_ATTEMPT_ID,
+                                       SCOPE_TASK_NAME,
+                                       SCOPE_TASK_SUBTASK_INDEX,
+                                       SCOPE_TASK_ATTEMPT_NUM,
+                                       SCOPE_OPERATOR_NAME
+                       });
+               }
+
+               public String[] formatScope(TaskMetricGroup parent, String 
operatorName) {
+
+                       final String[] template = copyTemplate();
+                       final String[] values = {
+                                       parent.parent().parent().hostname(),
+                                       
parent.parent().parent().taskManagerId(),
+                                       valueOrNull(parent.parent().jobId()),
+                                       valueOrNull(parent.parent().jobName()),
+                                       valueOrNull(parent.vertexId()),
+                                       valueOrNull(parent.executionId()),
+                                       valueOrNull(parent.taskName()),
+                                       String.valueOf(parent.subtaskIndex()),
+                                       String.valueOf(parent.attemptNumber()),
+                                       valueOrNull(operatorName)
+                       };
+                       return bindVariables(template, values);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Scope Format Base
+       // 
------------------------------------------------------------------------
+
+       /** The scope format */
+       private final String format;
+
+       /** The format, split into components */
+       private final String[] template;
+
+       private final int[] templatePos;
+
+       private final int[] valuePos;
+
+       // 
------------------------------------------------------------------------
+
+       protected ScopeFormat(String format, ScopeFormat parent, String[] 
variables) {
+               checkNotNull(format, "format is null");
+
+               final String[] rawComponents = format.split("\\" + 
SCOPE_SEPARATOR);
+
+               // compute the template array
+               final boolean parentAsPrefix = rawComponents.length > 0 && 
rawComponents[0].equals(SCOPE_INHERIT_PARENT);
+               if (parentAsPrefix) {
+                       if (parent == null) {
+                               throw new IllegalArgumentException("Component 
scope format requires parent prefix (starts with '"
+                                       + SCOPE_INHERIT_PARENT + "'), but this 
component has no parent (is root component).");
+                       }
+
+                       this.format = format.length() > 2 ? format.substring(2) 
: "<empty>";
+
+                       String[] parentTemplate = parent.template;
+                       int parentLen = parentTemplate.length;
+                       
+                       this.template = new String[parentLen + 
rawComponents.length - 1];
+                       System.arraycopy(parentTemplate, 0, this.template, 0, 
parentLen);
+                       System.arraycopy(rawComponents, 1, this.template, 
parentLen, rawComponents.length - 1);
+               }
+               else {
+                       this.format = format.isEmpty() ? "<empty>" : format;
+                       this.template = rawComponents;
+               }
+
+               // --- compute the replacement matrix ---
+               // a bit of clumsy Java collections code ;-)
+               
+               HashMap<String, Integer> varToValuePos = arrayToMap(variables);
+               List<Integer> templatePos = new ArrayList<>();
+               List<Integer> valuePos = new ArrayList<>();
+
+               for (int i = 0; i < template.length; i++) {
+                       final String component = template[i];
+                       
+                       // check if that is a variable
+                       if (component != null && component.length() >= 3 &&
+                                       component.charAt(0) == '<' && 
component.charAt(component.length() - 1) == '>') {
+
+                               // this is a variable
+                               Integer replacementPos = 
varToValuePos.get(component);
+                               if (replacementPos != null) {
+                                       templatePos.add(i);
+                                       valuePos.add(replacementPos);
+                               }
+                       }
+               }
+
+               this.templatePos = integerListToArray(templatePos);
+               this.valuePos = integerListToArray(valuePos);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public String format() {
+               return format;
+       }
+
+       protected final String[] copyTemplate() {
+               String[] copy = new String[template.length];
+               System.arraycopy(template, 0, copy, 0, template.length);
+               return copy;
+       }
+
+       protected final String[] bindVariables(String[] template, String[] 
values) {
+               final int len = templatePos.length;
+               for (int i = 0; i < len; i++) {
+                       template[templatePos[i]] = values[valuePos[i]];
+               }
+               return template;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return "ScopeFormat '" + format + '\'';
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Formats the given string to resemble a scope variable.
+        *
+        * @param scope The string to format
+        * @return The formatted string
+        */
+       public static String asVariable(String scope) {
+               return SCOPE_VARIABLE_PREFIX + scope + SCOPE_VARIABLE_SUFFIX;
+       }
+
+       public static String concat(String... components) {
+               StringBuilder sb = new StringBuilder();
+               sb.append(components[0]);
+               for (int x = 1; x < components.length; x++) {
+                       sb.append(SCOPE_SEPARATOR);
+                       sb.append(components[x]);
+               }
+               return sb.toString();
+       }
+       
+       static String valueOrNull(Object value) {
+               return (value == null || (value instanceof String && ((String) 
value).isEmpty())) ?
+                               "null" : value.toString();
+       }
+
+       static HashMap<String, Integer> arrayToMap(String[] array) {
+               HashMap<String, Integer> map = new HashMap<>(array.length);
+               for (int i = 0; i < array.length; i++) {
+                       map.put(array[i], i);
+               }
+               return map;
+       }
+
+       private static int[] integerListToArray(List<Integer> list) {
+               int[] array = new int[list.size()];
+               int pos = 0;
+               for (Integer i : list) {
+                       array[pos++] = i;
+               }
+               return array;
+       }
+}

Reply via email to