http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java
deleted file mode 100644
index 23a7768..0000000
--- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.reporter.MetricReporter;
-
-import java.util.List;
-
-public class DummyReporter implements MetricReporter {
-       @Override
-       public void open(Configuration config) {
-       }
-
-       @Override
-       public void close() {
-       }
-
-       @Override
-       public void notifyOfAddedMetric(Metric metric, String name) {
-       }
-
-       @Override
-       public void notifyOfRemovedMetric(Metric metric, String name) {
-       }
-
-       @Override
-       public String generateName(String name, List<String> scope) {
-               return "";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java
deleted file mode 100644
index 8b7714f..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.util;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
-
-public class DummyTaskManagerMetricGroup extends TaskManagerMetricGroup {
-       
-       public DummyTaskManagerMetricGroup() {
-               super(new DummyMetricRegistry(), "host", "id");
-       }
-
-       public DummyJobMetricGroup addJob(JobID id, String name) {
-               return new DummyJobMetricGroup();
-       }
-
-       @Override
-       protected void addMetric(String name, Metric metric) {}
-
-       @Override
-       public MetricGroup addGroup(String name) {
-               return new DummyMetricGroup();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java
deleted file mode 100644
index db2c557..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.util;
-
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
-import org.apache.flink.util.AbstractID;
-
-public class DummyTaskMetricGroup extends TaskMetricGroup {
-       
-       public DummyTaskMetricGroup() {
-               super(new DummyMetricRegistry(), new DummyJobMetricGroup(), new 
AbstractID(), new AbstractID(), 0, "task");
-       }
-
-       public DummyOperatorMetricGroup addOperator(String name) {
-               return new DummyOperatorMetricGroup();
-       }
-
-       @Override
-       protected void addMetric(String name, Metric metric) {}
-
-       @Override
-       public MetricGroup addGroup(String name) {
-               return new DummyMetricGroup();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java 
b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
index 5d8a8e0..0c139f0 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java
@@ -19,10 +19,10 @@
 package org.apache.flink.metrics.util;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.metrics.reporter.AbstractReporter;
 
-import java.util.List;
-
 public class TestReporter extends AbstractReporter {
 
        @Override
@@ -32,7 +32,8 @@ public class TestReporter extends AbstractReporter {
        public void close() {}
 
        @Override
-       public String generateName(String name, List<String> scope) {
-               return name;
-       }
+       public void notifyOfAddedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {}
+
+       @Override
+       public void notifyOfRemovedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
index 31b3ba2..2682584 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
@@ -31,8 +31,9 @@ import org.apache.flink.api.java.tuple.builder.Tuple2Builder;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -77,7 +78,8 @@ public class CoGroupOperatorCollectionTest implements 
Serializable {
                        final HashMap<String, Accumulator<?, ?>> accumulators = 
new HashMap<String, Accumulator<?, ?>>();
                        final HashMap<String, Future<Path>> cpTasks = new 
HashMap<>();
                        final TaskInfo taskInfo = new TaskInfo("Test UDF", 0, 
4, 0);
-                       final RuntimeContext ctx = new 
RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulators, new 
DummyMetricGroup());
+                       final RuntimeContext ctx = new RuntimeUDFContext(
+                                       taskInfo, null, executionConfig, 
cpTasks, accumulators, new UnregisteredMetricsGroup());
 
                        {
                                SumCoGroup udf1 = new SumCoGroup();

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
index 1b627c4..c5a247a 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -169,9 +169,20 @@ public class GroupReduceOperatorTest implements 
java.io.Serializable {
 
                        ExecutionConfig executionConfig = new ExecutionConfig();
                        executionConfig.disableObjectReuse();
-                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, 
executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, 
Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
+                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input,
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig,
+                                                       new HashMap<String, 
Future<Path>>(),
+                                                       new HashMap<String, 
Accumulator<?, ?>>(),
+                                                       new 
UnregisteredMetricsGroup()),
+                                       executionConfig);
+                       
                        executionConfig.enableObjectReuse();
-                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, 
executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, 
Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
+                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input,
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig,
+                                                       new HashMap<String, 
Future<Path>>(),
+                                                       new HashMap<String, 
Accumulator<?, ?>>(),
+                                                       new 
UnregisteredMetricsGroup()),
+                                       executionConfig);
                        
                        
                        Set<Tuple2<String, Integer>> resultSetMutableSafe = new 
HashSet<Tuple2<String, Integer>>(resultMutableSafe);

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
index 1d5668b..89574a8 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -30,8 +28,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
 
 import java.io.Serializable;
@@ -43,6 +42,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Future;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 @SuppressWarnings({ "unchecked", "serial" })
 public class InnerJoinOperatorBaseTest implements Serializable {
 
@@ -107,10 +109,22 @@ public class InnerJoinOperatorBaseTest implements 
Serializable {
                try {
                        final TaskInfo taskInfo = new TaskInfo("op", 0, 1, 0);
                        ExecutionConfig executionConfig = new ExecutionConfig();
+                       
                        executionConfig.disableObjectReuse();
-                       List<Tuple2<Double, String>> resultSafe = 
base.executeOnCollections(inputData1, inputData2, new 
RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, 
Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new 
DummyMetricGroup()), executionConfig);
+                       List<Tuple2<Double, String>> resultSafe = 
base.executeOnCollections(inputData1, inputData2,
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig,
+                                                       new HashMap<String, 
Future<Path>>(),
+                                                       new HashMap<String, 
Accumulator<?, ?>>(),
+                                                       new 
UnregisteredMetricsGroup()),
+                                       executionConfig);
+                       
                        executionConfig.enableObjectReuse();
-                       List<Tuple2<Double, String>> resultRegular = 
base.executeOnCollections(inputData1, inputData2, new 
RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, 
Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new 
DummyMetricGroup()), executionConfig);
+                       List<Tuple2<Double, String>> resultRegular = 
base.executeOnCollections(inputData1, inputData2,
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig,
+                                                       new HashMap<String, 
Future<Path>>(),
+                                                       new HashMap<String, 
Accumulator<?, ?>>(),
+                                                       new 
UnregisteredMetricsGroup()),
+                                       executionConfig);
 
                        assertEquals(expected, new HashSet<>(resultSafe));
                        assertEquals(expected, new HashSet<>(resultRegular));

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
index 4317c03..150854d 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
@@ -30,7 +30,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -42,7 +43,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Arrays.asList;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings({"serial", "unchecked"})
 public class ReduceOperatorTest implements java.io.Serializable {
@@ -145,10 +148,22 @@ public class ReduceOperatorTest implements 
java.io.Serializable {
                        final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 
0);
 
                        ExecutionConfig executionConfig = new ExecutionConfig();
+                       
                        executionConfig.disableObjectReuse();
-                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, 
executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, 
Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
+                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input,
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig,
+                                                       new HashMap<String, 
Future<Path>>(),
+                                                       new HashMap<String, 
Accumulator<?, ?>>(),
+                                                       new 
UnregisteredMetricsGroup()),
+                                       executionConfig);
+                       
                        executionConfig.enableObjectReuse();
-                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, 
executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, 
Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig);
+                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input,
+                                       new RuntimeUDFContext(taskInfo, null, 
executionConfig,
+                                                       new HashMap<String, 
Future<Path>>(),
+                                                       new HashMap<String, 
Accumulator<?, ?>>(),
+                                                       new 
UnregisteredMetricsGroup()),
+                                       executionConfig);
 
                        Set<Tuple2<String, Integer>> resultSetMutableSafe = new 
HashSet<Tuple2<String, Integer>>(resultMutableSafe);
                        Set<Tuple2<String, Integer>> resultSetRegular = new 
HashSet<Tuple2<String, Integer>>(resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
 
b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
index 059704d..74fdb85 100644
--- 
a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
+++ 
b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
@@ -15,10 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.dropwizard;
 
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
 import com.codahale.metrics.ScheduledReporter;
+
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.dropwizard.metrics.CounterWrapper;
@@ -26,19 +29,19 @@ import org.apache.flink.dropwizard.metrics.GaugeWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
 
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} 
that wraps a
  * Dropwizard {@link com.codahale.metrics.Reporter}.
  */
 @PublicEvolving
-public abstract class ScheduledDropwizardReporter implements MetricReporter, 
Scheduled {
-       protected MetricRegistry registry;
-       protected ScheduledReporter reporter;
+public abstract class ScheduledDropwizardReporter implements MetricReporter, 
Scheduled, Reporter {
 
        public static final String ARG_HOST = "host";
        public static final String ARG_PORT = "port";
@@ -46,25 +49,24 @@ public abstract class ScheduledDropwizardReporter 
implements MetricReporter, Sch
        public static final String ARG_CONVERSION_RATE = "rateConversion";
        public static final String ARG_CONVERSION_DURATION = 
"durationConversion";
 
-       protected ScheduledDropwizardReporter() {
-               this.registry = new MetricRegistry();
-       }
+       // 
------------------------------------------------------------------------
 
-       @Override
-       public synchronized void notifyOfAddedMetric(Metric metric, String 
name) {
-               if (metric instanceof Counter) {
-                       registry.register(name, new CounterWrapper((Counter) 
metric));
-               } else if (metric instanceof Gauge) {
-                       registry.register(name, new GaugeWrapper((Gauge<?>) 
metric));
-               }
-       }
+       protected final MetricRegistry registry;
 
-       @Override
-       public synchronized void notifyOfRemovedMetric(Metric metric, String 
name) {
-               registry.remove(name);
+       protected ScheduledReporter reporter;
+
+       private final Map<Gauge<?>, String> gauges = new HashMap<>();
+       private final Map<Counter, String> counters = new HashMap<>();
+
+       // 
------------------------------------------------------------------------
+
+       protected ScheduledDropwizardReporter() {
+               this.registry = new MetricRegistry();
        }
 
-       public abstract ScheduledReporter getReporter(Configuration config);
+       // 
------------------------------------------------------------------------
+       //  life cycle
+       // 
------------------------------------------------------------------------
 
        @Override
        public void open(Configuration config) {
@@ -76,24 +78,60 @@ public abstract class ScheduledDropwizardReporter 
implements MetricReporter, Sch
                this.reporter.stop();
        }
 
+       // 
------------------------------------------------------------------------
+       //  adding / removing metrics
+       // 
------------------------------------------------------------------------
+
        @Override
-       public String generateName(String name, List<String> scope) {
-               StringBuilder sb = new StringBuilder();
-               for (String s : scope) {
-                       sb.append(s);
-                       sb.append('.');
+       public void notifyOfAddedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
+               final String fullName = group.getScopeString() + '.' + 
metricName;
+
+               synchronized (this) {
+                       if (metric instanceof Counter) {
+                               counters.put((Counter) metric, fullName);
+                               registry.register(fullName, new 
CounterWrapper((Counter) metric));
+                       }
+                       else if (metric instanceof Gauge) {
+                               gauges.put((Gauge<?>) metric, fullName);
+                               registry.register(fullName, 
GaugeWrapper.fromGauge((Gauge<?>) metric));
+                       }
                }
-               sb.append(name);
-               return sb.toString();
        }
 
        @Override
-       public synchronized void report() {
-               this.reporter.report(
-                       this.registry.getGauges(),
-                       this.registry.getCounters(),
-                       this.registry.getHistograms(),
-                       this.registry.getMeters(),
-                       this.registry.getTimers());
+       public void notifyOfRemovedMetric(Metric metric, String metricName, 
AbstractMetricGroup group) {
+               synchronized (this) {
+                       String fullName;
+                       
+                       if (metric instanceof Counter) {
+                               fullName = counters.remove(metric);
+                       } else if (metric instanceof Gauge) {
+                               fullName = gauges.remove(metric);
+                       } else {
+                               fullName = null;
+                       }
+                       
+                       if (fullName != null) {
+                               registry.remove(fullName);
+                       }
+               }
        }
+
+       // 
------------------------------------------------------------------------
+       //  scheduled reporting
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void report() {
+               synchronized (this) {
+                       this.reporter.report(
+                               this.registry.getGauges(),
+                               this.registry.getCounters(),
+                               this.registry.getHistograms(),
+                               this.registry.getMeters(),
+                               this.registry.getTimers());
+               }
+       }
+
+       public abstract ScheduledReporter getReporter(Configuration config);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
 
b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
index fcb629a..655cd60 100644
--- 
a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
+++ 
b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
@@ -15,11 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.dropwizard.metrics;
 
 import org.apache.flink.metrics.Gauge;
 
 public class GaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
+       
        private final Gauge<T> gauge;
 
        public GaugeWrapper(Gauge<T> gauge) {
@@ -30,4 +32,10 @@ public class GaugeWrapper<T> implements 
com.codahale.metrics.Gauge<T> {
        public T getValue() {
                return this.gauge.getValue();
        }
+
+       public static <T> GaugeWrapper<T> fromGauge(Gauge<?> gauge) {
+               @SuppressWarnings("unchecked")
+               Gauge<T> typedGauge = (Gauge<T>) gauge;
+               return new GaugeWrapper<>(typedGauge); 
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
 
b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
new file mode 100644
index 0000000..adf9394
--- /dev/null
+++ 
b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.ganglia;
+
+import com.codahale.metrics.ScheduledReporter;
+
+import info.ganglia.gmetric4j.gmetric.GMetric;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+@PublicEvolving
+public class GangliaReporter extends ScheduledDropwizardReporter {
+       
+       public static final String ARG_DMAX = "dmax";
+       public static final String ARG_TMAX = "tmax";
+       public static final String ARG_TTL = "ttl";
+       public static final String ARG_MODE_ADDRESSING = "addressingMode";
+
+       @Override
+       public ScheduledReporter getReporter(Configuration config) {
+
+               try {
+                       String host = config.getString(ARG_HOST, null);
+                       int port = config.getInteger(ARG_PORT, -1);
+                       if (host == null || host.length() == 0 || port < 1) {
+                               throw new IllegalArgumentException("Invalid 
host/port configuration. Host: " + host + " Port: " + port);
+                       }
+                       String addressingMode = 
config.getString(ARG_MODE_ADDRESSING, "MULTICAST");
+                       int ttl = config.getInteger(ARG_TTL, -1);
+                       GMetric gMetric = new GMetric(host, port, 
GMetric.UDPAddressingMode.valueOf(addressingMode), ttl);
+
+                       String prefix = config.getString(ARG_PREFIX, null);
+                       String conversionRate = 
config.getString(ARG_CONVERSION_RATE, null);
+                       String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, null);
+                       int dMax = config.getInteger(ARG_DMAX, 0);
+                       int tMax = config.getInteger(ARG_TMAX, 60);
+
+                       com.codahale.metrics.ganglia.GangliaReporter.Builder 
builder =
+                               
com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry);
+
+                       if (prefix != null) {
+                               builder.prefixedWith(prefix);
+                       }
+                       if (conversionRate != null) {
+                               
builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
+                       }
+                       if (conversionDuration != null) {
+                               
builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
+                       }
+                       builder.withDMax(dMax);
+                       builder.withTMax(tMax);
+
+                       return builder.build(gMetric);
+               } catch (IOException e) {
+                       throw new RuntimeException("Error while instantiating 
GangliaReporter.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java
 
b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java
deleted file mode 100644
index a1dafc9..0000000
--- 
a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.graphite;
-
-import com.codahale.metrics.ScheduledReporter;
-import info.ganglia.gmetric4j.gmetric.GMetric;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-public class GangliaReporter extends ScheduledDropwizardReporter {
-       public static final String ARG_DMAX = "dmax";
-       public static final String ARG_TMAX = "tmax";
-       public static final String ARG_TTL = "ttl";
-       public static final String ARG_MODE_ADDRESSING = "addressingMode";
-
-       @Override
-       public ScheduledReporter getReporter(Configuration config) {
-
-               try {
-                       String host = config.getString(ARG_HOST, null);
-                       int port = config.getInteger(ARG_PORT, -1);
-                       if (host == null || host.length() == 0 || port < 1) {
-                               throw new IllegalArgumentException("Invalid 
host/port configuration. Host: " + host + " Port: " + port);
-                       }
-                       String addressingMode = 
config.getString(ARG_MODE_ADDRESSING, "MULTICAST");
-                       int ttl = config.getInteger(ARG_TTL, -1);
-                       GMetric gMetric = new GMetric(host, port, 
GMetric.UDPAddressingMode.valueOf(addressingMode), ttl);
-
-                       String prefix = config.getString(ARG_PREFIX, null);
-                       String conversionRate = 
config.getString(ARG_CONVERSION_RATE, null);
-                       String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, null);
-                       int dMax = config.getInteger(ARG_DMAX, 0);
-                       int tMax = config.getInteger(ARG_TMAX, 60);
-
-                       com.codahale.metrics.ganglia.GangliaReporter.Builder 
builder =
-                               
com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry);
-
-                       if (prefix != null) {
-                               builder.prefixedWith(prefix);
-                       }
-                       if (conversionRate != null) {
-                               
builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
-                       }
-                       if (conversionDuration != null) {
-                               
builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
-                       }
-                       builder.withDMax(dMax);
-                       builder.withTMax(tMax);
-
-                       return builder.build(gMetric);
-               } catch (IOException e) {
-                       throw new RuntimeException("Error while instantiating 
GangliaReporter.", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
 
b/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
index b28d7a4..16be830 100644
--- 
a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
+++ 
b/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
@@ -15,17 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.graphite;
 
 import com.codahale.metrics.ScheduledReporter;
 import com.codahale.metrics.graphite.Graphite;
+
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
 
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+@PublicEvolving
 public class GraphiteReporter extends ScheduledDropwizardReporter {
+
        @Override
        public ScheduledReporter getReporter(Configuration config) {
                String host = config.getString(ARG_HOST, null);
@@ -56,15 +60,4 @@ public class GraphiteReporter extends 
ScheduledDropwizardReporter {
 
                return builder.build(new Graphite(host, port));
        }
-
-       @Override
-       public String generateName(String name, List<String> scope) {
-               StringBuilder sb = new StringBuilder();
-               for (String s : scope) {
-                       sb.append(s.replace(".", "_").replace("\"", ""));
-                       sb.append(".");
-               }
-               sb.append(name);
-               return sb.toString();
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 
b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index e57001f..124b21d 100644
--- 
a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ 
b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -15,13 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.statsd;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.reporter.AbstractReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,9 +33,7 @@ import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.SocketException;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Largely based on the StatsDReporter class by ReadyTalk
@@ -40,20 +41,19 @@ import java.util.concurrent.TimeUnit;
  *
  * Ported since it was not present in maven central.
  */
+@PublicEvolving
 public class StatsDReporter extends AbstractReporter implements Scheduled {
+       
        private static final Logger LOG = 
LoggerFactory.getLogger(StatsDReporter.class);
 
        public static final String ARG_HOST = "host";
        public static final String ARG_PORT = "port";
-       public static final String ARG_CONVERSION_RATE = "rateConversion";
-       public static final String ARG_CONVERSION_DURATION = 
"durationConversion";
+//     public static final String ARG_CONVERSION_RATE = "rateConversion";
+//     public static final String ARG_CONVERSION_DURATION = 
"durationConversion";
 
        private DatagramSocket socket;
        private InetSocketAddress address;
 
-       private double durationFactor;
-       private double rateFactor;
-
        @Override
        public void open(Configuration config) {
                String host = config.getString(ARG_HOST, null);
@@ -63,16 +63,17 @@ public class StatsDReporter extends AbstractReporter 
implements Scheduled {
                        throw new IllegalArgumentException("Invalid host/port 
configuration. Host: " + host + " Port: " + port);
                }
 
-               String conversionRate = config.getString(ARG_CONVERSION_RATE, 
"SECONDS");
-               String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
-
                this.address = new InetSocketAddress(host, port);
-               this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
-               this.durationFactor = 1.0 / 
TimeUnit.valueOf(conversionDuration).toNanos(1);
+
+//             String conversionRate = config.getString(ARG_CONVERSION_RATE, 
"SECONDS");
+//             String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
+//             this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
+//             this.durationFactor = 1.0 / 
TimeUnit.valueOf(conversionDuration).toNanos(1);
+
                try {
                        this.socket = new DatagramSocket(0);
                } catch (SocketException e) {
-                       throw new RuntimeException("Failure while creating 
socket. ", e);
+                       throw new RuntimeException("Could not create datagram 
socket. ", e);
                }
        }
 
@@ -83,50 +84,40 @@ public class StatsDReporter extends AbstractReporter 
implements Scheduled {
                }
        }
 
-       @Override
-       public String generateName(String name, List<String> scope) {
-               StringBuilder sb = new StringBuilder();
-               for (String s : scope) {
-                       sb.append(s);
-                       sb.append('.');
-               }
-               sb.append(name);
-               return sb.toString();
-       }
-
-       public void send(final String name, final double value) {
-               send(name, "" + value);
-       }
-
-       public void send(final String name, final String value) {
-               try {
-                       String formatted = String.format("%s:%s|g", name, 
value);
-                       byte[] data = formatted.getBytes();
-                       socket.send(new DatagramPacket(data, data.length, 
this.address));
-               } catch (IOException e) {
-                       LOG.error("unable to send packet to statsd at '{}:{}'", 
address.getHostName(), address.getPort());
-               }
-       }
+       // 
------------------------------------------------------------------------
 
        @Override
        public void report() {
-               for (Map.Entry<String, Gauge<?>> entry : gauges.entrySet()) {
-                       reportGauge(entry.getKey(), entry.getValue());
+               for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) {
+                       reportGauge(entry.getValue(), entry.getKey());
                }
 
-               for (Map.Entry<String, Counter> entry : counters.entrySet()) {
-                       reportCounter(entry.getKey(), entry.getValue());
+               for (Map.Entry<Counter, String> entry : counters.entrySet()) {
+                       reportCounter(entry.getValue(), entry.getKey());
                }
        }
 
+       // 
------------------------------------------------------------------------
+       
        private void reportCounter(final String name, final Counter counter) {
-               send(name, counter.getCount());
+               send(name, String.valueOf(counter.getCount()));
        }
 
        private void reportGauge(final String name, final Gauge<?> gauge) {
-               final String value = gauge.getValue().toString();
+               Object value = gauge.getValue();
                if (value != null) {
-                       send((name), value);
+                       send(name, value.toString());
+               }
+       }
+
+       private void send(final String name, final String value) {
+               try {
+                       String formatted = String.format("%s:%s|g", name, 
value);
+                       byte[] data = formatted.getBytes();
+                       socket.send(new DatagramPacket(data, data.length, 
this.address));
+               }
+               catch (IOException e) {
+                       LOG.error("unable to send packet to statsd at '{}:{}'", 
address.getHostName(), address.getPort());
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index d711c47..ecf8d1a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1092,7 +1092,8 @@ class TaskManager(
       val taskMetricGroup = taskManagerMetricGroup
           .addTaskForJob(
             tdd.getJobID, jobName,
-            tdd.getVertexID, tdd.getExecutionId, tdd.getIndexInSubtaskGroup, 
tdd.getTaskName)
+            tdd.getVertexID, tdd.getExecutionId, tdd.getTaskName,
+            tdd.getIndexInSubtaskGroup, tdd.getAttemptNumber)
 
       val task = new Task(
         tdd,

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 15ad353..89cde95 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -227,6 +227,6 @@ public class TestTaskContext<S, T> implements 
TaskContext<S, T> {
 
        @Override
        public MetricGroup getMetricGroup() {
-               return new DummyMetricGroup();
+               return new UnregisteredMetricsGroup();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index 6e9b817..766123f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -27,14 +27,14 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.util.DummyMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.Driver;
-import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.ResettableDriver;
+import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -371,7 +371,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, 
OUT> extends TestLog
        
        @Override
        public MetricGroup getMetricGroup() {
-               return new DummyMetricGroup();
+               return new UnregisteredMetricsGroup();
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index eb2a3a7..3a69fab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -18,40 +18,41 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.util.DummyMetricGroup;
-import org.apache.flink.runtime.operators.Driver;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
-import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.TaskContext;
+import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.ResettableDriver;
+import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
 @RunWith(Parameterized.class)
 public class DriverTestBase<S extends Function> extends TestLogger implements 
TaskContext<S, Record> {
        
@@ -368,7 +369,7 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Ta
        
        @Override
        public MetricGroup getMetricGroup() {
-               return new DummyMetricGroup();
+               return new UnregisteredMetricsGroup();
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 9b54383..78fb422 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -18,17 +18,12 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.Future;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
-import org.apache.flink.metrics.util.DummyTaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
@@ -42,12 +37,17 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Future;
+
 public class DummyEnvironment implements Environment {
 
-       private final TaskInfo taskInfo;
        private final JobID jobId = new JobID();
        private final JobVertexID jobVertexId = new JobVertexID();
+       private final ExecutionAttemptID executionId = new ExecutionAttemptID();
        private final ExecutionConfig executionConfig = new ExecutionConfig();
+       private final TaskInfo taskInfo;
 
        public DummyEnvironment(String taskName, int numSubTasks, int 
subTaskIndex) {
                this.taskInfo = new TaskInfo(taskName, subTaskIndex, 
numSubTasks, 0);
@@ -70,12 +70,12 @@ public class DummyEnvironment implements Environment {
 
        @Override
        public ExecutionAttemptID getExecutionId() {
-               return null;
+               return executionId;
        }
 
        @Override
        public Configuration getTaskConfiguration() {
-               return null;
+               return new Configuration();
        }
 
        @Override
@@ -85,12 +85,12 @@ public class DummyEnvironment implements Environment {
 
        @Override
        public TaskMetricGroup getMetricGroup() {
-               return new DummyTaskMetricGroup();
+               return new UnregisteredTaskMetricsGroup();
        }
 
        @Override
        public Configuration getJobConfiguration() {
-               return null;
+               return new Configuration();
        }
 
        @Override
@@ -115,7 +115,7 @@ public class DummyEnvironment implements Environment {
 
        @Override
        public ClassLoader getUserClassLoader() {
-               return null;
+               return getClass().getClassLoader();
        }
 
        @Override
@@ -134,12 +134,10 @@ public class DummyEnvironment implements Environment {
        }
 
        @Override
-       public void acknowledgeCheckpoint(long checkpointId) {
-       }
+       public void acknowledgeCheckpoint(long checkpointId) {}
 
        @Override
-       public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> 
state) {
-       }
+       public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> 
state) {}
 
        @Override
        public ResultPartitionWriter getWriter(int index) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index b774b48..0220149 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
-import org.apache.flink.metrics.util.DummyTaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
@@ -216,7 +215,7 @@ public class MockEnvironment implements Environment {
 
        @Override
        public TaskMetricGroup getMetricGroup() {
-               return new DummyTaskMetricGroup();
+               return new UnregisteredTaskMetricsGroup();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
new file mode 100644
index 0000000..a2edce2
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.JobMetricGroup;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.UUID;
+
+public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
+       
+       private static final MetricRegistry EMPTY_REGISTRY = new 
MetricRegistry(new Configuration());
+
+       
+       public UnregisteredTaskMetricsGroup() {
+               super(EMPTY_REGISTRY, new DummyJobMetricGroup(),
+                               new JobVertexID(), new ExecutionAttemptID(), 
"testtask", 0, 0);
+       }
+
+       @Override
+       protected void addMetric(String name, Metric metric) {}
+
+       @Override
+       public MetricGroup addGroup(String name) {
+               return new UnregisteredMetricsGroup();
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       private static class DummyTaskManagerMetricsGroup extends 
TaskManagerMetricGroup {
+
+               public DummyTaskManagerMetricsGroup() {
+                       super(EMPTY_REGISTRY, "localhost", 
UUID.randomUUID().toString());
+               }
+       }
+
+       private static class DummyJobMetricGroup extends JobMetricGroup {
+               
+               public DummyJobMetricGroup() {
+                       super(EMPTY_REGISTRY, new 
DummyTaskManagerMetricsGroup(), new JobID(), "testjob");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 812507f..eb087c6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -29,14 +29,14 @@ import 
org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.util.DummyTaskMetricGroup;
 import org.apache.flink.runtime.execution.Environment;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -46,6 +46,7 @@ import org.apache.flink.util.Collector;
 
 import org.junit.After;
 import org.junit.Test;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -61,8 +62,15 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 @SuppressWarnings({"serial", 
"SynchronizationOnLocalVariableOrMethodParameter"})
 public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@@ -795,7 +803,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                final Environment env = mock(Environment.class);
                when(env.getTaskInfo()).thenReturn(new TaskInfo("Test task 
name", 0, 1, 0));
                
when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
-               when(env.getMetricGroup()).thenReturn(new 
DummyTaskMetricGroup());
+               when(env.getMetricGroup()).thenReturn(new 
UnregisteredTaskMetricsGroup());
 
                when(task.getEnvironment()).thenReturn(env);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index c89ac50..af46513 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
@@ -34,12 +33,12 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.util.DummyTaskMetricGroup;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -47,6 +46,7 @@ import 
org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
 import org.junit.After;
 import org.junit.Test;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -964,11 +964,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                when(task.getName()).thenReturn("Test task name");
                when(task.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
 
-               final Environment env = mock(Environment.class);
-               when(env.getTaskInfo()).thenReturn(new TaskInfo("Test task 
name", 0, 1, 0));
-               
when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
-               when(env.getMetricGroup()).thenReturn(new 
DummyTaskMetricGroup());
-               
+               final Environment env = new DummyEnvironment("Test task name", 
1, 0);
                when(task.getEnvironment()).thenReturn(env);
 
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index b2d0196..f8c36de 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
-import org.apache.flink.metrics.util.DummyTaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -44,6 +43,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
@@ -308,7 +308,7 @@ public class StreamMockEnvironment implements Environment {
 
        @Override
        public TaskMetricGroup getMetricGroup() {
-               return new DummyTaskMetricGroup();
+               return new UnregisteredTaskMetricsGroup();
        }
 }
 

Reply via email to