http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
new file mode 100644
index 0000000..1451637
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups.scope;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A container for component scope formats.
+ */
+@Internal
+public class ScopeFormats {
+
+       private final TaskManagerScopeFormat taskManagerFormat;
+       private final TaskManagerJobScopeFormat taskManagerJobFormat;
+       private final TaskScopeFormat taskFormat;
+       private final OperatorScopeFormat operatorFormat;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates all default scope formats.
+        */
+       public ScopeFormats() {
+               this.taskManagerFormat = new 
TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT);
+
+               this.taskManagerJobFormat = new TaskManagerJobScopeFormat(
+                               
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, this.taskManagerFormat);
+               
+               this.taskFormat = new TaskScopeFormat(
+                               ScopeFormat.DEFAULT_SCOPE_TASK_GROUP, 
this.taskManagerJobFormat);
+               
+               this.operatorFormat = new OperatorScopeFormat(
+                               ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP, 
this.taskFormat);
+       }
+
+       /**
+        * Creates all scope formats, based on the given scope format strings.
+        */
+       public ScopeFormats(
+                       String taskManagerFormat,
+                       String taskManagerJobFormat,
+                       String taskFormat,
+                       String operatorFormat)
+       {
+               this.taskManagerFormat = new 
TaskManagerScopeFormat(taskManagerFormat);
+               this.taskManagerJobFormat = new 
TaskManagerJobScopeFormat(taskManagerJobFormat, this.taskManagerFormat);
+               this.taskFormat = new TaskScopeFormat(taskFormat, 
this.taskManagerJobFormat);
+               this.operatorFormat = new OperatorScopeFormat(operatorFormat, 
this.taskFormat);
+       }
+
+       /**
+        * Creates a {@code ScopeFormats} with the given scope formats.
+        */
+       public ScopeFormats(
+                       TaskManagerScopeFormat taskManagerFormat,
+                       TaskManagerJobScopeFormat taskManagerJobFormat,
+                       TaskScopeFormat taskFormat,
+                       OperatorScopeFormat operatorFormat)
+       {
+               this.taskManagerFormat = checkNotNull(taskManagerFormat);
+               this.taskManagerJobFormat = checkNotNull(taskManagerJobFormat);
+               this.taskFormat = checkNotNull(taskFormat);
+               this.operatorFormat = checkNotNull(operatorFormat);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public TaskManagerScopeFormat getTaskManagerFormat() {
+               return this.taskManagerFormat;
+       }
+
+       public TaskManagerJobScopeFormat getJobFormat() {
+               return this.taskManagerJobFormat;
+       }
+
+       public TaskScopeFormat getTaskFormat() {
+               return this.taskFormat;
+       }
+
+       public OperatorScopeFormat getOperatorFormat() {
+               return this.operatorFormat;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
index 271e91a..f2e78bf 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
@@ -18,33 +18,45 @@
 
 package org.apache.flink.metrics.reporter;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.AbstractMetricGroup;
 
+import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
+/**
+ * Base interface for custom metric reporters.
+ */
+@PublicEvolving
 public abstract class AbstractReporter implements MetricReporter {
-       
-       protected Map<String, Gauge<?>> gauges = new ConcurrentHashMap<>();
-       protected Map<String, Counter> counters = new ConcurrentHashMap<>();
+
+       protected final Map<Gauge<?>, String> gauges = new HashMap<>();
+       protected final Map<Counter, String> counters = new HashMap<>();
 
        @Override
-       public void notifyOfAddedMetric(Metric metric, String name) {
-               if (metric instanceof Counter) {
-                       counters.put(name, (Counter) metric);
-               } else if (metric instanceof Gauge) {
-                       gauges.put(name, (Gauge<?>) metric);
+       public void notifyOfAddedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
+               final String name = group.getScopeString() + '.' + metricName;
+
+               synchronized (this) {
+                       if (metric instanceof Counter) {
+                               counters.put((Counter) metric, name);
+                       } else if (metric instanceof Gauge) {
+                               gauges.put((Gauge<?>) metric, name);
+                       }
                }
        }
 
        @Override
-       public void notifyOfRemovedMetric(Metric metric, String name) {
-               if (metric instanceof Counter) {
-                       counters.remove(name);
-               } else if (metric instanceof Gauge) {
-                       gauges.remove(name);
+       public void notifyOfRemovedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
+               synchronized (this) {
+                       if (metric instanceof Counter) {
+                               counters.remove(metric);
+                       } else if (metric instanceof Gauge) {
+                               gauges.remove(metric);
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
index 69fa11a..71c80de 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.reporter;
 
 import org.apache.flink.annotation.Internal;
@@ -22,46 +23,76 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
+
+import org.apache.flink.metrics.groups.AbstractMetricGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.management.InstanceAlreadyExistsException;
 import javax.management.InstanceNotFoundException;
-import javax.management.MBeanRegistrationException;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
- * {@link org.apache.flink.metrics.reporter.MetricReporter} that exports 
{@link org.apache.flink.metrics.Metric}s via JMX.
+ * {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
  *
  * Largely based on the JmxReporter class of the dropwizard metrics library
  * 
https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java
  */
 @Internal
 public class JMXReporter implements MetricReporter {
-       private static final Logger LOG = 
LoggerFactory.getLogger(JMXReporter.class);
-
-       private MBeanServer mBeanServer;
 
        private static final String PREFIX = "org.apache.flink.metrics:";
        private static final String KEY_PREFIX = "key";
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(JMXReporter.class);
+
+       // 
------------------------------------------------------------------------
+
+       /** The server where the management beans are registered and 
deregistered */
+       private final MBeanServer mBeanServer;
+
+       /** The names under which the registered metrics have been added to the 
MBeanServer */ 
+       private final Map<Metric, ObjectName> registeredMetrics;
+
+       /**
+        * Creates a new JMXReporter
+        */
        public JMXReporter() {
                this.mBeanServer = ManagementFactory.getPlatformMBeanServer();
+               this.registeredMetrics = new HashMap<>();
        }
 
+       // 
------------------------------------------------------------------------
+       //  life cycle
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void open(Configuration config) {}
+
+       @Override
+       public void close() {}
+
+       // 
------------------------------------------------------------------------
+       //  adding / removing metrics
+       // 
------------------------------------------------------------------------
+
        @Override
-       public void notifyOfAddedMetric(Metric metric, String name) {
+       public void notifyOfAddedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
+               final String name = generateJmxName(metricName, 
group.getScopeComponents());
+
                AbstractBean jmxMetric;
                ObjectName jmxName;
                try {
                        jmxName = new ObjectName(name);
                } catch (MalformedObjectNameException e) {
-                       throw new IllegalArgumentException("Metric name did not 
conform to JMX ObjectName rules: " + name, e);
+                       LOG.error("Metric name did not conform to JMX 
ObjectName rules: " + name, e);
+                       return;
                }
 
                if (metric instanceof Gauge) {
@@ -69,68 +100,137 @@ public class JMXReporter implements MetricReporter {
                } else if (metric instanceof Counter) {
                        jmxMetric = new JmxCounter((Counter) metric);
                } else {
-                       throw new IllegalArgumentException("Unknown metric 
type: " + metric.getClass());
+                       LOG.error("Unknown metric type: " + 
metric.getClass().getName());
+                       return;
                }
 
                try {
-                       mBeanServer.registerMBean(jmxMetric, jmxName);
-               } catch (NotCompliantMBeanException e) { //implementation error 
on our side
+                       synchronized (this) {
+                               mBeanServer.registerMBean(jmxMetric, jmxName);
+                               registeredMetrics.put(metric, jmxName);
+                       }
+               } catch (NotCompliantMBeanException e) {
+                       // implementation error on our side
                        LOG.error("Metric did not comply with JMX MBean naming 
rules.", e);
                } catch (InstanceAlreadyExistsException e) {
                        LOG.error("A metric with the name " + jmxName + " was 
already registered.", e);
-               } catch (MBeanRegistrationException e) {
-                       LOG.error("Failed to register metric.", e);
+               } catch (Throwable t) {
+                       LOG.error("Failed to register metric", t);
                }
        }
 
        @Override
-       public void notifyOfRemovedMetric(Metric metric, String name) {
+       public void notifyOfRemovedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
                try {
-                       mBeanServer.unregisterMBean(new ObjectName(name));
-               } catch (MBeanRegistrationException e) {
-                       LOG.error("Un-registering metric failed.", e);
-               } catch (MalformedObjectNameException e) {
-                       LOG.error("Un-registering metric failed due to invalid 
name.", e);
+                       synchronized (this) {
+                               final ObjectName jmxName = 
registeredMetrics.remove(metric);
+
+                               // remove the metric if it is known. if it is 
not known, ignore the request
+                               if (jmxName != null) {
+                                       mBeanServer.unregisterMBean(jmxName);
+                               }
+                       }
                } catch (InstanceNotFoundException e) {
-                       //alright then
+                       // alright then
+               } catch (Throwable t) {
+                       // never propagate exceptions - the metrics reporter 
should not affect the stability
+                       // of the running system
+                       LOG.error("Un-registering metric failed", t);
                }
        }
 
-       @Override
-       public void open(Configuration config) {
-       }
+       // 
------------------------------------------------------------------------
+       //  Utilities 
+       // 
------------------------------------------------------------------------
 
-       @Override
-       public void close() {
-       }
+       static String generateJmxName(String metricName, String[] 
scopeComponents) {
+               final StringBuilder nameBuilder = new StringBuilder(128);
+               nameBuilder.append(PREFIX);
 
-       @Override
-       public String generateName(String name, List<String> origin) {
-               StringBuilder fullName = new StringBuilder();
-
-               fullName.append(PREFIX);
-               for (int x = 0; x < origin.size(); x++) {
-                       fullName.append(KEY_PREFIX);
-                       fullName.append(x);
-                       fullName.append("=");
-                       String value = origin.get(x);
-                       value = value.replaceAll("\"", "");
-                       value = value.replaceAll(" ", "_");
-                       value = value.replaceAll("[,=;:?'*]", "-");
-                       fullName.append(value);
-                       fullName.append(",");
+               for (int x = 0; x < scopeComponents.length; x++) {
+                       // write keyX=
+                       nameBuilder.append(KEY_PREFIX);
+                       nameBuilder.append(x);
+                       nameBuilder.append("=");
+
+                       // write scopeName component
+                       
nameBuilder.append(replaceInvalidChars(scopeComponents[x]));
+                       nameBuilder.append(",");
                }
-               fullName.append("name=").append(name);
 
-               return fullName.toString();
-       }
+               // write the name
+               
nameBuilder.append("name=").append(replaceInvalidChars(metricName));
 
-       public interface MetricMBean {
+               return nameBuilder.toString();
        }
+       
+       /**
+        * Lightweight method to replace unsupported characters.
+        * If the string does not contain any unsupported characters, this 
method creates no
+        * new string (and in fact no new objects at all).
+        * 
+        * <p>Replacements:
+        * 
+        * <ul>
+        *     <li>{@code "} is removed</li>
+        *     <li>{@code space} is replaced by {@code _} (underscore)</li>
+        *     <li>{@code , = ; : ? ' *} are replaced by {@code -} (hyphen)</li>
+        * </ul>
+        */
+       static String replaceInvalidChars(String str) {
+               char[] chars = null;
+               final int strLen = str.length();
+               int pos = 0;
+               
+               for (int i = 0; i < strLen; i++) {
+                       final char c = str.charAt(i);
+                       switch (c) {
+                               case '"':
+                                       // remove character by not moving cursor
+                                       if (chars == null) {
+                                               chars = str.toCharArray();
+                                       }
+                                       break;
 
-       private abstract static class AbstractBean implements MetricMBean {
+                               case ' ':
+                                       if (chars == null) {
+                                               chars = str.toCharArray();
+                                       }
+                                       chars[pos++] = '_';
+                                       break;
+                               
+                               case ',':
+                               case '=':
+                               case ';':
+                               case ':':
+                               case '?':
+                               case '\'':
+                               case '*':
+                                       if (chars == null) {
+                                               chars = str.toCharArray();
+                                       }
+                                       chars[pos++] = '-';
+                                       break;
+
+                               default:
+                                       if (chars != null) {
+                                               chars[pos] = c;
+                                       }
+                                       pos++;
+                       }
+               }
+               
+               return chars == null ? str : new String(chars, 0, pos);
        }
 
+       // 
------------------------------------------------------------------------
+       //  Interfaces and base classes for JMX beans 
+       // 
------------------------------------------------------------------------
+
+       public interface MetricMBean {}
+
+       private abstract static class AbstractBean implements MetricMBean {}
+
        public interface JmxCounterMBean extends MetricMBean {
                long getCount();
        }
@@ -153,7 +253,7 @@ public class JMXReporter implements MetricReporter {
        }
 
        private static class JmxGauge extends AbstractBean implements 
JmxGaugeMBean {
-               
+
                private final Gauge<?> gauge;
 
                public JmxGauge(Gauge<?> gauge) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
index f87f5d3..9458246 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
@@ -15,28 +15,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.reporter;
 
-import com.codahale.metrics.Reporter;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
-
-import java.util.List;
+import org.apache.flink.metrics.groups.AbstractMetricGroup;
 
 /**
- * Reporters are used to export {@link org.apache.flink.metrics.Metric}s to an 
external backend.
- * <p>
- * Reporters are instantiated generically and must have a no-argument 
constructor.
+ * Reporters are used to export {@link Metric Metrics} to an external backend.
+ * 
+ * <p>Reporters are instantiated via reflection and must be public, 
non-abstract, and have a
+ * public no-argument constructor.
  */
 @PublicEvolving
-public interface MetricReporter extends Reporter {
-       
+public interface MetricReporter {
+
+       // 
------------------------------------------------------------------------
+       //  life cycle
+       // 
------------------------------------------------------------------------
+
        /**
         * Configures this reporter. Since reporters are instantiated 
generically and hence parameter-less,
         * this method is the place where the reporters set their basic fields 
based on configuration values.
-        * <p>
-        * This method is always called first on a newly instantiated reporter.
+        * 
+        * <p>This method is always called first on a newly instantiated 
reporter.
         *
         * @param config The configuration with all parameters.
         */
@@ -47,28 +51,25 @@ public interface MetricReporter extends Reporter {
         */
        void close();
 
-       /**
-        * Called when a new {@link org.apache.flink.metrics.Metric} was added.
-        *
-        * @param metric metric that was added
-        * @param name   name of the metric
-        */
-       void notifyOfAddedMetric(Metric metric, String name);
+       // 
------------------------------------------------------------------------
+       //  adding / removing metrics
+       // 
------------------------------------------------------------------------
 
        /**
-        * Called when a {@link org.apache.flink.metrics.Metric} was removed.
+        * Called when a new {@link Metric} was added.
         *
-        * @param metric metric that was removed
-        * @param name   name of the metric
+        * @param metric      the metric that was added
+        * @param metricName  the name of the metric
+        * @param group       the group that contains the metric
         */
-       void notifyOfRemovedMetric(Metric metric, String name);
+       void notifyOfAddedMetric(Metric metric, String metricName, 
AbstractMetricGroup group);
 
        /**
-        * Generates the reported name of a metric based on it's 
hierarchy/scope and associated name.
+        * Called when a {@link Metric} was should be removed.
         *
-        * @param name  name of the metric
-        * @param scope hierarchy/scope of the metric
-        * @return reported name
+        * @param metric      the metric that should be removed
+        * @param metricName  the name of the metric
+        * @param group       the group that contains the metric
         */
-       String generateName(String name, List<String> scope);
+       void notifyOfRemovedMetric(Metric metric, String metricName, 
AbstractMetricGroup group);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
index 3638f7a..cf1fc52 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java
@@ -15,18 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.reporter;
 
 import org.apache.flink.annotation.PublicEvolving;
 
 /**
- * Marker interface for reporters that actively send out data periodically.
+ * Interface for reporters that actively send out data periodically.
  */
 @PublicEvolving
 public interface Scheduled {
+
        /**
-        * Report the current measurements.
-        * This method is called in regular intervals
+        * Report the current measurements. This method is called periodically 
by the
+        * metrics registry that uses the reoprter.
         */
        void report();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
index 83c88cc..8f00cd5 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
@@ -31,7 +31,8 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
 import org.junit.Test;
 
 
@@ -42,7 +43,11 @@ public class RuntimeUDFContextTest {
        @Test
        public void testBroadcastVariableNotFound() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, 
getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, 
Future<Path>>(),new HashMap<String, Accumulator<?, ?>>(), new 
DummyMetricGroup());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext(
+                                       taskInfo, getClass().getClassLoader(), 
new ExecutionConfig(), 
+                                       new HashMap<String, Future<Path>>(),
+                                       new HashMap<String, Accumulator<?, 
?>>(),
+                                       new UnregisteredMetricsGroup());
                        
                        try {
                                ctx.getBroadcastVariable("some name");
@@ -72,7 +77,11 @@ public class RuntimeUDFContextTest {
        @Test
        public void testBroadcastVariableSimple() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, 
getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, 
Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new 
DummyMetricGroup());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext(
+                                       taskInfo, getClass().getClassLoader(), 
new ExecutionConfig(),
+                                       new HashMap<String, Future<Path>>(),
+                                       new HashMap<String, Accumulator<?, 
?>>(),
+                                       new UnregisteredMetricsGroup());
                        
                        ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 
3, 4));
                        ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 
2.0, 3.0, 4.0));
@@ -106,7 +115,11 @@ public class RuntimeUDFContextTest {
        @Test
        public void testBroadcastVariableWithInitializer() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, 
getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, 
Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new 
DummyMetricGroup());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext(
+                                       taskInfo, getClass().getClassLoader(), 
new ExecutionConfig(),
+                                       new HashMap<String, Future<Path>>(),
+                                       new HashMap<String, Accumulator<?, 
?>>(),
+                                       new UnregisteredMetricsGroup());
                        
                        ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 
4));
                        
@@ -131,7 +144,11 @@ public class RuntimeUDFContextTest {
        @Test
        public void testResetBroadcastVariableWithInitializer() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, 
getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, 
Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new 
DummyMetricGroup());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext(
+                                       taskInfo, getClass().getClassLoader(), 
new ExecutionConfig(),
+                                       new HashMap<String, Future<Path>>(),
+                                       new HashMap<String, Accumulator<?, 
?>>(),
+                                       new UnregisteredMetricsGroup());
                        
                        ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 
4));
                        
@@ -154,7 +171,11 @@ public class RuntimeUDFContextTest {
        @Test
        public void testBroadcastVariableWithInitializerAndMismatch() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, 
getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, 
Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new 
DummyMetricGroup());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext(
+                                       taskInfo, getClass().getClassLoader(), 
new ExecutionConfig(),
+                                       new HashMap<String, Future<Path>>(),
+                                       new HashMap<String, Accumulator<?, 
?>>(),
+                                       new UnregisteredMetricsGroup());
                        
                        ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 
4));
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
index 554820e..ae0f8e5 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.io;
 
 import java.util.HashMap;
@@ -27,11 +26,13 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.types.Value;
-import org.junit.Assert;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Tests runtime context access from inside an RichInputFormat class
  */
@@ -41,9 +42,14 @@ public class RichInputFormatTest {
        public void testCheckRuntimeContextAccess() {
                final SerializedInputFormat<Value> inputFormat = new 
SerializedInputFormat<Value>();
                final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0);
-               inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, 
getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, 
Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new 
DummyMetricGroup()));
+               inputFormat.setRuntimeContext(
+                               new RuntimeUDFContext(
+                                               taskInfo, 
getClass().getClassLoader(), new ExecutionConfig(),
+                                               new HashMap<String, 
Future<Path>>(),
+                                               new HashMap<String, 
Accumulator<?, ?>>(),
+                                               new 
UnregisteredMetricsGroup()));
 
-               
Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
-               
Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);
+               
assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
+               
assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
index 09db3a9..296af11 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.io;
 
 import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.Future;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -28,11 +26,13 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.types.Value;
-import org.junit.Assert;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Tests runtime context access from inside an RichOutputFormat class
  */
@@ -42,9 +42,14 @@ public class RichOutputFormatTest {
        public void testCheckRuntimeContextAccess() {
                final SerializedOutputFormat<Value> inputFormat = new 
SerializedOutputFormat<Value>();
                final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0);
-               inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, 
getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, 
Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new 
DummyMetricGroup()));
+               
+               inputFormat.setRuntimeContext(new RuntimeUDFContext(
+                               taskInfo, getClass().getClassLoader(), new 
ExecutionConfig(),
+                               new HashMap<String, Future<Path>>(),
+                               new HashMap<String, Accumulator<?, ?>>(),
+                               new UnregisteredMetricsGroup()));
 
-               
Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
-               
Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);
+               
assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
+               
assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
index 7c905c1..71bb102 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
@@ -28,8 +28,9 @@ import 
org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.common.operators.util.TestRichOutputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.types.Nothing;
+
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -95,13 +96,20 @@ public class GenericDataSinkBaseTest implements 
java.io.Serializable {
                        final TaskInfo taskInfo = new TaskInfo("test_sink", 0, 
1, 0);
                        executionConfig.disableObjectReuse();
                        in.reset();
-                       sink.executeOnCollections(asList(TestIOData.NAMES), new 
RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new 
DummyMetricGroup()), executionConfig);
-                       assertEquals(out.output, asList(TestIOData.RICH_NAMES));
+                       
+                       sink.executeOnCollections(asList(TestIOData.NAMES), new 
RuntimeUDFContext(
+                                       taskInfo, null, executionConfig, 
cpTasks, accumulatorMap, new UnregisteredMetricsGroup()),
+                                       executionConfig);
+               
+                               assertEquals(out.output, 
asList(TestIOData.RICH_NAMES));
 
                        executionConfig.enableObjectReuse();
                        out.clear();
                        in.reset();
-                       sink.executeOnCollections(asList(TestIOData.NAMES), new 
RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new 
DummyMetricGroup()), executionConfig);
+                       
+                       sink.executeOnCollections(asList(TestIOData.NAMES), new 
RuntimeUDFContext(
+                                       taskInfo, null, executionConfig, 
cpTasks, accumulatorMap, new UnregisteredMetricsGroup()),
+                                       executionConfig);
                        assertEquals(out.output, asList(TestIOData.RICH_NAMES));
                } catch(Exception e){
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
index c360c62..2dabe48 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
@@ -27,7 +27,8 @@ import 
org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.common.operators.util.TestRichInputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -84,7 +85,11 @@ public class GenericDataSourceBaseTest implements 
java.io.Serializable {
                        executionConfig.disableObjectReuse();
                        assertEquals(false, in.hasBeenClosed());
                        assertEquals(false, in.hasBeenOpened());
-                       List<String> resultMutableSafe = 
source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, 
executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), 
executionConfig);
+                       
+                       List<String> resultMutableSafe = 
source.executeOnCollections(
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig, cpTasks, accumulatorMap,
+                                                       new 
UnregisteredMetricsGroup()), executionConfig);
+                       
                        assertEquals(true, in.hasBeenClosed());
                        assertEquals(true, in.hasBeenOpened());
 
@@ -92,13 +97,18 @@ public class GenericDataSourceBaseTest implements 
java.io.Serializable {
                        executionConfig.enableObjectReuse();
                        assertEquals(false, in.hasBeenClosed());
                        assertEquals(false, in.hasBeenOpened());
-                       List<String> resultRegular = 
source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, 
executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), 
executionConfig);
+                       
+                       List<String> resultRegular = 
source.executeOnCollections(
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig, cpTasks, accumulatorMap,
+                                                       new 
UnregisteredMetricsGroup()), executionConfig);
+                       
                        assertEquals(true, in.hasBeenClosed());
                        assertEquals(true, in.hasBeenOpened());
 
                        assertEquals(asList(TestIOData.RICH_NAMES), 
resultMutableSafe);
                        assertEquals(asList(TestIOData.RICH_NAMES), 
resultRegular);
-               } catch(Exception e){
+               }
+               catch(Exception e){
                        e.printStackTrace();
                        fail(e.getMessage());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
index 9447efd..f125c4b 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
@@ -29,8 +29,9 @@ import 
org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -79,7 +80,11 @@ public class FlatMapOperatorCollectionTest implements 
Serializable {
                final TaskInfo taskInfo = new TaskInfo("Test UDF", 0, 4, 0);
                // run on collections
                final List<String> result = getTestFlatMapOperator(udf)
-                               .executeOnCollections(input, new 
RuntimeUDFContext(taskInfo,  null, executionConfig, new HashMap<String, 
Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new 
DummyMetricGroup()), executionConfig);
+                               .executeOnCollections(input,
+                                               new RuntimeUDFContext(
+                                                       taskInfo,  null, 
executionConfig, new HashMap<String, Future<Path>>(),
+                                                       new HashMap<String, 
Accumulator<?, ?>>(), new UnregisteredMetricsGroup()),
+                                               executionConfig);
 
                Assert.assertEquals(input.size(), result.size());
                Assert.assertEquals(input, result);

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
index a610a4d..8befcb9 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
@@ -30,8 +30,9 @@ import 
org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
 
 import java.io.Serializable;
@@ -125,10 +126,18 @@ public class InnerJoinOperatorBaseTest implements 
Serializable {
                        final HashMap<String, Future<Path>> cpTasks = new 
HashMap<>();
 
                        ExecutionConfig executionConfig = new ExecutionConfig();
+                       
                        executionConfig.disableObjectReuse();
-                       List<Integer> resultSafe = 
base.executeOnCollections(inputData1, inputData2, new 
RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new 
DummyMetricGroup()), executionConfig);
+                       List<Integer> resultSafe = 
base.executeOnCollections(inputData1, inputData2,
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig, cpTasks,
+                                                       accumulatorMap, new 
UnregisteredMetricsGroup()),
+                                       executionConfig);
+                       
                        executionConfig.enableObjectReuse();
-                       List<Integer> resultRegular = 
base.executeOnCollections(inputData1, inputData2, new 
RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new 
DummyMetricGroup()), executionConfig);
+                       List<Integer> resultRegular = 
base.executeOnCollections(inputData1, inputData2,
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig, cpTasks,
+                                                       accumulatorMap, new 
UnregisteredMetricsGroup()),
+                                       executionConfig);
 
                        assertEquals(expected, resultSafe);
                        assertEquals(expected, resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
index 7ecdefa..d79e2a5 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
@@ -18,15 +18,6 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import static org.junit.Assert.*;
-import static java.util.Arrays.asList;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -35,13 +26,24 @@ import 
org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 @SuppressWarnings("serial")
 public class MapOperatorTest implements java.io.Serializable {
 
@@ -113,9 +115,17 @@ public class MapOperatorTest implements 
java.io.Serializable {
                        final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 
0);
                        ExecutionConfig executionConfig = new ExecutionConfig();
                        executionConfig.disableObjectReuse();
-                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, 
executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), 
executionConfig);
+                       
+                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input,
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig, cpTasks,
+                                                       accumulatorMap, new 
UnregisteredMetricsGroup()),
+                                       executionConfig);
+                       
                        executionConfig.enableObjectReuse();
-                       List<Integer> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, 
executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), 
executionConfig);
+                       List<Integer> resultRegular = 
op.executeOnCollections(input,
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig, cpTasks,
+                                                       accumulatorMap, new 
UnregisteredMetricsGroup()),
+                                       executionConfig);
                        
                        assertEquals(asList(1, 2, 3, 4, 5, 6), 
resultMutableSafe);
                        assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
index 5012718..83c194a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
@@ -40,6 +40,7 @@ import 
org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+
 import org.junit.Test;
 
 @SuppressWarnings("serial")
@@ -86,9 +87,21 @@ public class PartitionMapOperatorTest implements 
java.io.Serializable {
 
                        ExecutionConfig executionConfig = new ExecutionConfig();
                        executionConfig.disableObjectReuse();
-                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, 
executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, 
Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
+                       
+                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input,
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig,
+                                                       new HashMap<String, 
Future<Path>>(),
+                                                       new HashMap<String, 
Accumulator<?, ?>>(),
+                                                       new 
UnregisteredMetricsGroup()),
+                                       executionConfig);
+                       
                        executionConfig.enableObjectReuse();
-                       List<Integer> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, 
executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, 
Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
+                       List<Integer> resultRegular = 
op.executeOnCollections(input,
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig,
+                                                       new HashMap<String, 
Future<Path>>(),
+                                                       new HashMap<String, 
Accumulator<?, ?>>(),
+                                                       new 
UnregisteredMetricsGroup()),
+                                       executionConfig);
                        
                        assertEquals(asList(1, 2, 3, 4, 5, 6), 
resultMutableSafe);
                        assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java 
b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
index 32cc11c..f8e0bf5 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
@@ -15,22 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.JobMetricGroup;
-import org.apache.flink.metrics.groups.OperatorMetricGroup;
-import org.apache.flink.metrics.groups.Scope;
+import org.apache.flink.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.metrics.groups.scope.ScopeFormats;
 import org.apache.flink.metrics.reporter.Scheduled;
 import org.apache.flink.metrics.util.TestReporter;
+
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.List;
+import static org.junit.Assert.*;
 
 public class MetricRegistryTest {
+       
        /**
         * Verifies that the reporter class argument is correctly used to 
instantiate and open the reporter.
         */
@@ -118,45 +119,11 @@ public class MetricRegistryTest {
        }
 
        /**
-        * Verifies that groups are correctly created, nesting works, and names 
are properly forwarded to generate names.
-        */
-       @Test
-       public void testMetricGroupGeneration() {
-               Configuration config = new Configuration();
-
-               config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, 
TestReporter4.class.getName());
-
-               MetricRegistry registry = new MetricRegistry(config);
-
-               MetricGroup root = new TaskManagerMetricGroup(registry, "host", 
"id");
-               root.counter("rootCounter");
-               root.addGroup("top").counter("topCounter");
-       }
-
-       protected static class TestReporter4 extends TestReporter {
-               @Override
-               public String generateName(String name, List<String> scope) {
-                       if (name.compareTo("rootCounter") == 0) {
-                               Assert.assertEquals("host", scope.get(0));
-                               return "success";
-                       } else if (name.compareTo("topCounter") == 0) {
-                               Assert.assertEquals("host", scope.get(0));
-                               Assert.assertEquals("taskmanager", 
scope.get(1));
-                               return "success";
-                       } else {
-                               Assert.fail();
-                               return null;
-                       }
-               }
-       }
-
-       /**
         * Verifies that reporters implementing the Listener interface are 
notified when Metrics are added or removed.
         */
        @Test
        public void testListener() {
                Configuration config = new Configuration();
-
                config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, 
TestReporter6.class.getName());
 
                MetricRegistry registry = new MetricRegistry(config);
@@ -165,8 +132,8 @@ public class MetricRegistryTest {
                root.counter("rootCounter");
                root.close();
 
-               Assert.assertTrue(TestReporter6.addCalled);
-               Assert.assertTrue(TestReporter6.removeCalled);
+               assertTrue(TestReporter6.addCalled);
+               assertTrue(TestReporter6.removeCalled);
        }
 
        protected static class TestReporter6 extends TestReporter {
@@ -174,22 +141,22 @@ public class MetricRegistryTest {
                public static boolean removeCalled = false;
 
                @Override
-               public void notifyOfAddedMetric(Metric metric, String name) {
+               public void notifyOfAddedMetric(Metric metric, String 
metricName, AbstractMetricGroup group) {
                        addCalled = true;
-                       Assert.assertTrue(metric instanceof Counter);
-                       Assert.assertEquals("rootCounter", name);
+                       assertTrue(metric instanceof Counter);
+                       assertEquals("rootCounter", metricName);
                }
 
                @Override
-               public void notifyOfRemovedMetric(Metric metric, String name) {
+               public void notifyOfRemovedMetric(Metric metric, String 
metricName, AbstractMetricGroup group) {
                        removeCalled = true;
                        Assert.assertTrue(metric instanceof Counter);
-                       Assert.assertEquals("rootCounter", name);
+                       Assert.assertEquals("rootCounter", metricName);
                }
        }
 
        /**
-        * Verifies that the scope configuration is properly extracted.
+        * Verifies that the scopeName configuration is properly extracted.
         */
        @Test
        public void testScopeConfig() {
@@ -200,18 +167,11 @@ public class MetricRegistryTest {
                config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TASK, 
"C");
                
config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_OPERATOR, "D");
 
-               Scope.ScopeFormat scopeConfig = new 
MetricRegistry(config).getScopeConfig();
-
-               Assert.assertEquals("A", scopeConfig.getTaskManagerFormat());
-               Assert.assertEquals("B", scopeConfig.getJobFormat());
-               Assert.assertEquals("C", scopeConfig.getTaskFormat());
-               Assert.assertEquals("D", scopeConfig.getOperatorFormat());
-
-               Scope.ScopeFormat emptyScopeConfig = new MetricRegistry(new 
Configuration()).getScopeConfig();
+               ScopeFormats scopeConfig = 
MetricRegistry.createScopeConfig(config);
 
-               Assert.assertEquals(TaskManagerMetricGroup.DEFAULT_SCOPE_TM, 
emptyScopeConfig.getTaskManagerFormat());
-               Assert.assertEquals(JobMetricGroup.DEFAULT_SCOPE_JOB, 
emptyScopeConfig.getJobFormat());
-               Assert.assertEquals(TaskMetricGroup.DEFAULT_SCOPE_TASK, 
emptyScopeConfig.getTaskFormat());
-               Assert.assertEquals(OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR, 
emptyScopeConfig.getOperatorFormat());
+               assertEquals("A", scopeConfig.getTaskManagerFormat().format());
+               assertEquals("B", scopeConfig.getJobFormat().format());
+               assertEquals("C", scopeConfig.getTaskFormat().format());
+               assertEquals("D", scopeConfig.getOperatorFormat().format());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
index cea11fb..e820762 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
@@ -15,64 +15,77 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.util.AbstractID;
-import org.junit.Test;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
 
-import java.util.List;
+import org.junit.Test;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 public class JobGroupTest {
+
        @Test
        public void testGenerateScopeDefault() {
                MetricRegistry registry = new MetricRegistry(new 
Configuration());
 
-               TaskMetricGroup tmGroup = new TaskManagerMetricGroup(registry, 
"host", "id")
-                               .addTaskForJob(new JobID(), "job", new 
AbstractID(), new AbstractID(), 0, "task");
-               JobMetricGroup jmGroup = tmGroup.parent();
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
new JobID(), "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName"},
+                               jmGroup.getScopeComponents());
 
-               List<String> scope = jmGroup.generateScope();
-               assertEquals(4, scope.size());
-               assertEquals("job", scope.get(3));
+               assertEquals(
+                               "theHostName.taskmanager.test-tm-id.myJobName",
+                               jmGroup.getScopeString());
        }
 
        @Test
-       public void testGenerateScopeWildcard() {
+       public void testGenerateScopeCustom() {
                MetricRegistry registry = new MetricRegistry(new 
Configuration());
 
-               TaskMetricGroup tmGroup = new TaskManagerMetricGroup(registry, 
"host", "id")
-                               .addTaskForJob(new JobID(), "job", new 
AbstractID(), new AbstractID(), 0, "task");
-               JobMetricGroup jmGroup = tmGroup.parent();
+               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("abc");
+               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
 
-               Scope.ScopeFormat format = new Scope.ScopeFormat();
-               format.setJobFormat(Scope.concat(Scope.SCOPE_WILDCARD, 
"superjob", JobMetricGroup.SCOPE_JOB_NAME));
+               JobID jid = new JobID();
 
-               List<String> scope = jmGroup.generateScope(format);
-               assertEquals(5, scope.size());
-               assertEquals("superjob", scope.get(3));
-               assertEquals("job", scope.get(4));
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
jmFormat, jid, "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "some-constant", "myJobName" },
+                               jmGroup.getScopeComponents());
+
+               assertEquals(
+                               "some-constant.myJobName",
+                               jmGroup.getScopeString());
        }
 
        @Test
-       public void testGenerateScopeCustom() {
+       public void testGenerateScopeCustomWildcard() {
                MetricRegistry registry = new MetricRegistry(new 
Configuration());
 
-               TaskMetricGroup tmGroup = new TaskManagerMetricGroup(registry, 
"host", "id")
-                               .addTaskForJob(new JobID(), "job", new 
AbstractID(), new AbstractID(), 0, "task");
-               JobMetricGroup jmGroup = tmGroup.parent();
+               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("peter.<tm_id>");
+               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
+
+               JobID jid = new JobID();
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
jmFormat, jid, "myJobName");
 
-               Scope.ScopeFormat format = new Scope.ScopeFormat();
-               
format.setJobFormat(Scope.concat(TaskManagerMetricGroup.SCOPE_TM_HOST, 
"superjob", JobMetricGroup.SCOPE_JOB_NAME));
+               assertArrayEquals(
+                               new String[] { "peter", "test-tm-id", 
"some-constant", jid.toString() },
+                               jmGroup.getScopeComponents());
 
-               List<String> scope = jmGroup.generateScope(format);
-               assertEquals(3, scope.size());
-               assertEquals("host", scope.get(0));
-               assertEquals("superjob", scope.get(1));
-               assertEquals("job", scope.get(2));
+               assertEquals(
+                               "peter.test-tm-id.some-constant." + jid,
+                               jmGroup.getScopeString());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
index cb962ea..5645b94 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
@@ -62,13 +62,10 @@ public class MetricGroupRegistrationTest {
                public static String lastPassedName;
 
                @Override
-               public void notifyOfAddedMetric(Metric metric, String name) {
+               public void notifyOfAddedMetric(Metric metric, String 
metricName, AbstractMetricGroup group) {
                        lastPassedMetric = metric;
-                       lastPassedName = name;
+                       lastPassedName = metricName;
                }
-
-               @Override
-               public void notifyOfRemovedMetric(Metric metric, String name) {}
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
index 31ab2a7..2849bab 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
 
-import org.apache.flink.metrics.util.DummyMetricGroup;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -37,7 +36,8 @@ public class MetricGroupTest {
 
        @Test
        public void sameGroupOnNameCollision() {
-               GenericMetricGroup group = new GenericMetricGroup(registry, new 
DummyMetricGroup(registry), "somegroup");
+               GenericMetricGroup group = new GenericMetricGroup(
+               registry, new DummyAbstractMetricGroup(registry), "somegroup");
 
                String groupName = "sometestname";
                MetricGroup subgroup1 = group.addGroup(groupName);
@@ -51,7 +51,7 @@ public class MetricGroupTest {
        @Test
        public void closedGroupDoesNotRegisterMetrics() {
                GenericMetricGroup group = new GenericMetricGroup(
-                               exceptionOnRegister, new 
DummyMetricGroup(exceptionOnRegister), "testgroup");
+                               exceptionOnRegister, new 
DummyAbstractMetricGroup(exceptionOnRegister), "testgroup");
                assertFalse(group.isClosed());
 
                group.close();
@@ -68,7 +68,7 @@ public class MetricGroupTest {
        @Test
        public void closedGroupCreatesClosedGroups() {
                GenericMetricGroup group = new 
GenericMetricGroup(exceptionOnRegister,
-                               new DummyMetricGroup(exceptionOnRegister), 
"testgroup");
+                               new 
DummyAbstractMetricGroup(exceptionOnRegister), "testgroup");
                assertFalse(group.isClosed());
 
                group.close();
@@ -82,7 +82,7 @@ public class MetricGroupTest {
        public void tolerateMetricNameCollisions() {
                final String name = "abctestname";
                GenericMetricGroup group = new GenericMetricGroup(
-                               registry, new DummyMetricGroup(registry), 
"testgroup");
+                               registry, new 
DummyAbstractMetricGroup(registry), "testgroup");
                
                assertNotNull(group.counter(name));
                assertNotNull(group.counter(name));
@@ -91,7 +91,8 @@ public class MetricGroupTest {
        @Test
        public void tolerateMetricAndGroupNameCollisions() {
                final String name = "abctestname";
-               GenericMetricGroup group = new GenericMetricGroup(registry, new 
DummyMetricGroup(registry), "testgroup");
+               GenericMetricGroup group = new GenericMetricGroup(
+                               registry, new 
DummyAbstractMetricGroup(registry), "testgroup");
                
                assertNotNull(group.addGroup(name));
                assertNotNull(group.counter(name));
@@ -100,7 +101,7 @@ public class MetricGroupTest {
        @Test(expected = IllegalArgumentException.class)
        public void exceptionOnIllegalName() {
                GenericMetricGroup group = new GenericMetricGroup(
-                               exceptionOnRegister, new 
DummyMetricGroup(exceptionOnRegister), "testgroup");
+                               exceptionOnRegister, new 
DummyAbstractMetricGroup(exceptionOnRegister), "testgroup");
                group.counter("ÜberCöunter");
        }
        
@@ -122,4 +123,21 @@ public class MetricGroupTest {
                        fail("Metric should never be un-registered");
                }
        }
+
+       // 
------------------------------------------------------------------------
+       
+       private static class DummyAbstractMetricGroup extends 
AbstractMetricGroup {
+
+               public DummyAbstractMetricGroup(MetricRegistry registry) {
+                       super(registry, new String[0]);
+               }
+
+               @Override
+               protected void addMetric(String name, Metric metric) {}
+
+               @Override
+               public MetricGroup addGroup(String name) {
+                       return new DummyAbstractMetricGroup(registry);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
index 6202834..cb5e082 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
@@ -15,73 +15,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricRegistry;
 import org.apache.flink.util.AbstractID;
-import org.junit.Test;
 
-import java.util.List;
+import org.junit.Test;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 public class OperatorGroupTest {
-       @Test
-       public void testGenerateScopeDefault() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               
-               OperatorMetricGroup operator = new 
TaskManagerMetricGroup(registry, "host", "id")
-                       .addTaskForJob(new JobID(), "job", new AbstractID(), 
new AbstractID(), 0, "task")
-                       .addOperator("operator");
-
-               List<String> scope = operator.generateScope();
-               assertEquals(6, scope.size());
-               assertEquals("host", scope.get(0));
-               assertEquals("taskmanager", scope.get(1));
-               assertEquals("id", scope.get(2));
-               assertEquals("job", scope.get(3));
-               assertEquals("operator", scope.get(4));
-               assertEquals("0", scope.get(5));
-       }
 
        @Test
-       public void testGenerateScopeWildcard() {
+       public void testGenerateScopeDefault() {
                MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               OperatorMetricGroup operator = new 
TaskManagerMetricGroup(registry, "host", "id")
-                       .addTaskForJob(new JobID(), "job", new AbstractID(), 
new AbstractID(), 0, "task")
-                       .addOperator("operator");
 
-               Scope.ScopeFormat format = new Scope.ScopeFormat();
-               format.setOperatorFormat(Scope.concat(Scope.SCOPE_WILDCARD, 
"op", OperatorMetricGroup.SCOPE_OPERATOR_NAME));
-
-               List<String> scope = operator.generateScope(format);
-               assertEquals(7, scope.size());
-               assertEquals("host", scope.get(0));
-               assertEquals("taskmanager", scope.get(1));
-               assertEquals("id", scope.get(2));
-               assertEquals("job", scope.get(3));
-               assertEquals("task", scope.get(4));
-               assertEquals("op", scope.get(5));
-               assertEquals("operator", scope.get(6));
-       }
-
-       @Test
-       public void testGenerateScopeCustom() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               OperatorMetricGroup operator = new 
TaskManagerMetricGroup(registry, "host", "id")
-                       .addTaskForJob(new JobID(), "job", new AbstractID(), 
new AbstractID(), 0, "task")
-                       .addOperator("operator");
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
new JobID(), "myJobName");
+               TaskMetricGroup taskGroup = new TaskMetricGroup(
+                               registry, jmGroup,  new AbstractID(),  new 
AbstractID(), "aTaskName", 11, 0);
+               OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, 
taskGroup, "myOpName");
 
-               Scope.ScopeFormat format = new Scope.ScopeFormat();
-               format.setOperatorFormat(Scope.concat("jobs", 
JobMetricGroup.SCOPE_JOB_NAME, "op", OperatorMetricGroup.SCOPE_OPERATOR_NAME));
+               assertArrayEquals(
+                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName", "myOpName", "11" },
+                               opGroup.getScopeComponents());
 
-               List<String> scope = operator.generateScope(format);
-               assertEquals(4, scope.size());
-               assertEquals("jobs", scope.get(0));
-               assertEquals("job", scope.get(1));
-               assertEquals("op", scope.get(2));
-               assertEquals("operator", scope.get(3));
+               assertEquals(
+                               
"theHostName.taskmanager.test-tm-id.myJobName.myOpName.11",
+                               opGroup.getScopeString());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
index 69faee7..4a492d2 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
@@ -15,16 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
 import org.apache.flink.util.AbstractID;
-import org.junit.Test;
 
-import java.util.List;
+import org.junit.Test;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 public class TaskGroupTest {
@@ -36,48 +41,74 @@ public class TaskGroupTest {
        @Test
        public void testGenerateScopeDefault() {
                MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               
-               TaskMetricGroup operator = new TaskManagerMetricGroup(registry, 
"host", "id")
-                       .addTaskForJob(new JobID(), "job", new AbstractID(), 
new AbstractID(), 0, "task");
 
-               List<String> scope = operator.generateScope();
-               assertEquals(5, scope.size());
-               assertEquals("task", scope.get(4));
+               AbstractID vertexId = new AbstractID();
+               AbstractID executionId = new AbstractID();
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
new JobID(), "myJobName");
+               TaskMetricGroup taskGroup = new TaskMetricGroup(registry, 
jmGroup, vertexId, executionId, "aTaskName", 13, 2);
+
+               assertArrayEquals(
+                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName", "aTaskName", "13"},
+                               taskGroup.getScopeComponents());
+
+               assertEquals(
+                               
"theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13",
+                               taskGroup.getScopeString());
        }
 
        @Test
-       public void testGenerateScopeWilcard() {
+       public void testGenerateScopeCustom() {
                MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               TaskMetricGroup operator = new TaskManagerMetricGroup(registry, 
"host", "id")
-                       .addTaskForJob(new JobID(), "job", new AbstractID(), 
new AbstractID(), 0, "task");
-
-               Scope.ScopeFormat format = new Scope.ScopeFormat();
-               format.setTaskFormat(Scope.concat(Scope.SCOPE_WILDCARD, 
"supertask", TaskMetricGroup.SCOPE_TASK_NAME));
-
-               List<String> scope = operator.generateScope(format);
-               assertEquals(6, scope.size());
-               assertEquals("host", scope.get(0));
-               assertEquals("taskmanager", scope.get(1));
-               assertEquals("id", scope.get(2));
-               assertEquals("job", scope.get(3));
-               assertEquals("supertask", scope.get(4));
-               assertEquals("task", scope.get(5));
+
+               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("abc");
+               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("def", tmFormat);
+               TaskScopeFormat taskFormat = new 
TaskScopeFormat("<tm_id>.<job_id>.<task_id>.<task_attempt_id>", jmFormat);
+
+               JobID jid = new JobID();
+               AbstractID vertexId = new AbstractID();
+               AbstractID executionId = new AbstractID();
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
jid, "myJobName");
+               TaskMetricGroup taskGroup = new TaskMetricGroup(
+                               registry, jmGroup, taskFormat, vertexId, 
executionId, "aTaskName", 13, 2);
+
+               assertArrayEquals(
+                               new String[] { "test-tm-id", jid.toString(), 
vertexId.toString(), executionId.toString() },
+                               taskGroup.getScopeComponents());
+
+               assertEquals(
+                               String.format("test-tm-id.%s.%s.%s", jid, 
vertexId, executionId),
+                               taskGroup.getScopeString());
        }
 
        @Test
-       public void testGenerateScopeCustom() {
+       public void testGenerateScopeWilcard() {
                MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               TaskMetricGroup operator = new TaskManagerMetricGroup(registry, 
"host", "id")
-                       .addTaskForJob(new JobID(), "job", new AbstractID(), 
new AbstractID(), 0, "task");
-
-               Scope.ScopeFormat format = new Scope.ScopeFormat();
-               
format.setTaskFormat(Scope.concat(TaskManagerMetricGroup.SCOPE_TM_HOST, 
JobMetricGroup.SCOPE_JOB_NAME, "supertask", TaskMetricGroup.SCOPE_TASK_NAME));
-
-               List<String> scope = operator.generateScope(format);
-               assertEquals(4, scope.size());
-               assertEquals("host", scope.get(0));
-               assertEquals("job", scope.get(1));
-               assertEquals("supertask", scope.get(2));
-               assertEquals("task", scope.get(3));
+
+               TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat(
+                               ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
+               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat(
+                               
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, tmFormat);
+
+               TaskScopeFormat format = new 
TaskScopeFormat("*.<task_attempt_id>.<subtask_index>", jmFormat);
+
+               AbstractID executionId = new AbstractID();
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
new JobID(), "myJobName");
+
+               TaskMetricGroup taskGroup = new TaskMetricGroup(
+                               registry, jmGroup, format, new AbstractID(), 
executionId, "aTaskName", 13, 1);
+
+               assertArrayEquals(
+                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName", executionId.toString(), "13" },
+                               taskGroup.getScopeComponents());
+
+               assertEquals(
+                               "theHostName.taskmanager.test-tm-id.myJobName." 
+ executionId + ".13",
+                               taskGroup.getScopeString());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
index 6b9a5fc..9adc1be 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java
@@ -15,15 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricRegistry;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
 import org.apache.flink.util.AbstractID;
-import org.junit.Test;
 
-import java.util.List;
+import org.junit.Test;
 
 import static org.junit.Assert.*;
 
@@ -55,9 +56,9 @@ public class TaskManagerGroupTest {
                final AbstractID execution13 = new AbstractID();
                final AbstractID execution21 = new AbstractID();
                
-               TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, 
vertex11, execution11, 17, "test");
-               TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, 
vertex12, execution12, 13, "test");
-               TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, 
vertex21, execution21, 7, "test");
+               TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, 
vertex11, execution11, "test", 17, 0);
+               TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, 
vertex12, execution12, "test", 13, 1);
+               TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, 
vertex21, execution21, "test", 7, 2);
                
                assertEquals(2, group.numRegisteredJobMetricGroups());
                assertFalse(tmGroup11.parent().isClosed());
@@ -77,7 +78,7 @@ public class TaskManagerGroupTest {
                assertEquals(1, group.numRegisteredJobMetricGroups());
                
                // add one more to job one
-               TaskMetricGroup tmGroup13 = group.addTaskForJob(jid1, jobName1, 
vertex13, execution13, 0, "test");
+               TaskMetricGroup tmGroup13 = group.addTaskForJob(jid1, jobName1, 
vertex13, execution13, "test", 0, 0);
                tmGroup12.close();
                tmGroup13.close();
 
@@ -108,9 +109,9 @@ public class TaskManagerGroupTest {
                final AbstractID execution12 = new AbstractID();
                final AbstractID execution21 = new AbstractID();
 
-               TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, 
vertex11, execution11, 17, "test");
-               TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, 
vertex12, execution12, 13, "test");
-               TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, 
vertex21, execution21, 7, "test");
+               TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, 
vertex11, execution11, "test", 17, 1);
+               TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, 
vertex12, execution12, "test", 13, 2);
+               TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, 
vertex21, execution21, "test", 7, 1);
                
                group.close();
                
@@ -120,48 +121,25 @@ public class TaskManagerGroupTest {
        }
        
        // 
------------------------------------------------------------------------
-       //  scope tests
+       //  scope name tests
        // 
------------------------------------------------------------------------
 
        @Test
        public void testGenerateScopeDefault() {
                MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               TaskManagerMetricGroup operator = new 
TaskManagerMetricGroup(registry, "host", "id");
-
-               List<String> scope = operator.generateScope();
-               assertEquals(3, scope.size());
-               assertEquals("host", scope.get(0));
-               assertEquals("taskmanager", scope.get(1));
-               assertEquals("id", scope.get(2));
-       }
-
-       @Test
-       public void testGenerateScopeWildcard() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               TaskManagerMetricGroup operator = new 
TaskManagerMetricGroup(registry, "host", "id");
+               TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, "localhost", "id");
 
-               Scope.ScopeFormat format = new Scope.ScopeFormat();
-               format.setTaskManagerFormat(Scope.concat(Scope.SCOPE_WILDCARD, 
"superhost", TaskManagerMetricGroup.SCOPE_TM_HOST));
-
-               List<String> scope = operator.generateScope(format);
-               assertEquals(2, scope.size());
-               assertEquals("superhost", scope.get(0));
-               assertEquals("host", scope.get(1));
+               assertArrayEquals(new String[] { "localhost", "taskmanager", 
"id" }, group.getScopeComponents());
+               assertEquals("localhost.taskmanager.id", 
group.getScopeString());
        }
 
        @Test
        public void testGenerateScopeCustom() {
                MetricRegistry registry = new MetricRegistry(new 
Configuration());
-               TaskManagerMetricGroup operator = new 
TaskManagerMetricGroup(registry, "host", "id");
-
-               Scope.ScopeFormat format = new Scope.ScopeFormat();
-               format.setTaskManagerFormat(Scope.concat("h", 
TaskManagerMetricGroup.SCOPE_TM_HOST, "t", TaskManagerMetricGroup.SCOPE_TM_ID));
+               TaskManagerScopeFormat format = new 
TaskManagerScopeFormat("constant.<host>.foo.<host>");
+               TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, format, "host", "id");
 
-               List<String> scope = operator.generateScope(format);
-               assertEquals(4, scope.size());
-               assertEquals("h", scope.get(0));
-               assertEquals("host", scope.get(1));
-               assertEquals("t", scope.get(2));
-               assertEquals("id", scope.get(3));
+               assertArrayEquals(new String[] { "constant", "host", "foo", 
"host" }, group.getScopeComponents());
+               assertEquals("constant.host.foo.host", group.getScopeString());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
index 0d683c2..abe1669 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
@@ -15,29 +15,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.reporter;
 
-import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.List;
+import static org.junit.Assert.*;
 
 public class JMXReporterTest {
+
+       @Test
+       public void testReplaceInvalidChars() {
+               assertEquals("", JMXReporter.replaceInvalidChars(""));
+               assertEquals("abc", JMXReporter.replaceInvalidChars("abc"));
+               assertEquals("abc", JMXReporter.replaceInvalidChars("abc\""));
+               assertEquals("abc", JMXReporter.replaceInvalidChars("\"abc"));
+               assertEquals("abc", JMXReporter.replaceInvalidChars("\"abc\""));
+               assertEquals("abc", 
JMXReporter.replaceInvalidChars("\"a\"b\"c\""));
+               assertEquals("", JMXReporter.replaceInvalidChars("\"\"\"\""));
+               assertEquals("____", JMXReporter.replaceInvalidChars("    "));
+               assertEquals("ab_-(c)-", JMXReporter.replaceInvalidChars("\"ab 
;(c)'"));
+               assertEquals("a_b_c", JMXReporter.replaceInvalidChars("a b c"));
+               assertEquals("a_b_c_", JMXReporter.replaceInvalidChars("a b c 
"));
+               assertEquals("a-b-c-", 
JMXReporter.replaceInvalidChars("a;b'c*"));
+               assertEquals("a------b------c", 
JMXReporter.replaceInvalidChars("a,=;:?'b,=;:?'c"));
+       }
+
        /**
         * Verifies that the JMXReporter properly generates the JMX name.
         */
        @Test
        public void testGenerateName() {
-               String name = "metric";
-
-               List<String> scope = new ArrayList<>();
-               scope.add("value0");
-               scope.add("value1");
-               scope.add("\"value2 (test),=;:?'");
-
-               String jmxName = new JMXReporter().generateName(name, scope);
+               String[] scope = { "value0", "value1", "\"value2 (test),=;:?'" 
};
+               String jmxName = JMXReporter.generateJmxName("TestMetric", 
scope);
 
-               
Assert.assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=metric",
 jmxName);
+               
assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=TestMetric",
 jmxName);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java
deleted file mode 100644
index ab78288..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.util;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.JobMetricGroup;
-import org.apache.flink.util.AbstractID;
-
-public class DummyJobMetricGroup extends JobMetricGroup {
-       
-       public DummyJobMetricGroup() {
-               super(new DummyMetricRegistry(), new 
DummyTaskManagerMetricGroup(), new JobID(), "job");
-       }
-
-       @Override
-       public DummyTaskMetricGroup addTask(AbstractID id, AbstractID 
attemptID, int subtaskIndex, String name) {
-               return new DummyTaskMetricGroup();
-       }
-
-       @Override
-       public void removeTaskMetricGroup(AbstractID executionId) {}
-
-       @Override
-       protected void addMetric(String name, Metric metric) {}
-
-       @Override
-       public MetricGroup addGroup(String name) {
-               return new DummyMetricGroup();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java
deleted file mode 100644
index 77ddd17..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.util;
-
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.metrics.groups.Scope;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class DummyMetricGroup extends AbstractMetricGroup {
-
-       public DummyMetricGroup() {
-               this(new DummyMetricRegistry());
-       }
-       
-       public DummyMetricGroup(MetricRegistry registry) {
-               super(registry);
-       }
-
-       @Override
-       public List<String> generateScope() {
-               return new ArrayList<>();
-       }
-
-       @Override
-       public List<String> generateScope(Scope.ScopeFormat format) {
-               return new ArrayList<>();
-       }
-
-       @Override
-       protected void addMetric(String name, Metric metric) {}
-       
-
-       @Override
-       public MetricGroup addGroup(String name) {
-               return new DummyMetricGroup();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java
deleted file mode 100644
index f8b73a9..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricRegistry;
-
-public class DummyMetricRegistry extends MetricRegistry {
-
-       public DummyMetricRegistry() {
-               super(new Configuration());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java
deleted file mode 100644
index e271d6e..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.util;
-
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.OperatorMetricGroup;
-
-public class DummyOperatorMetricGroup extends OperatorMetricGroup {
-       
-       public DummyOperatorMetricGroup() {
-               super(new DummyMetricRegistry(), new DummyTaskMetricGroup(), 
"operator", 0);
-       }
-
-       @Override
-       protected void addMetric(String name, Metric metric) {}
-
-       @Override
-       public MetricGroup addGroup(String name) {
-               return new DummyMetricGroup();
-       }
-}

Reply via email to