Repository: flink
Updated Branches:
  refs/heads/master cd232e683 -> 9bcbcf4a5


[FLINK-4246] Allow Specifying Multiple Metrics Reporters

This also updates documentation and tests.

Reporters can now be specified like this:

metrics.reporters: foo,bar

metrics.reporter.foo.class: JMXReporter.class
metrics.reporter.foo.port: 10

metrics.reporter.bar.class: GangliaReporter.class
metrics.reporter.bar.port: 11
metrics.reporter.bar.something: 42


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9bcbcf4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9bcbcf4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9bcbcf4a

Branch: refs/heads/master
Commit: 9bcbcf4a5d75de65671bf4cc4e1e3129d386a29b
Parents: cd232e6
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Jul 22 15:25:01 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue Jul 26 17:05:21 2016 +0200

----------------------------------------------------------------------
 docs/apis/metrics.md                            |  31 ++-
 docs/setup/config.md                            |  12 +-
 .../flink/configuration/ConfigConstants.java    |  34 ++-
 .../flink/configuration/Configuration.java      |  14 +-
 .../configuration/DelegatingConfiguration.java  | 222 +++++++++++++++++++
 .../UnmodifiableConfiguration.java              |   9 +
 .../DelegatingConfigurationTest.java            |  91 ++++++++
 .../ScheduledDropwizardReporterTest.java        |  18 +-
 .../DropwizardFlinkHistogramWrapperTest.java    |  10 +-
 .../flink/metrics/jmx/JMXReporterTest.java      |  59 +++--
 .../jobmanager/JMXJobManagerMetricTest.java     |   8 +-
 .../metrics/statsd/StatsDReporterTest.java      |  24 +-
 .../flink/runtime/metrics/MetricRegistry.java   | 145 ++++++------
 .../runtime/operators/util/TaskConfig.java      | 184 +--------------
 .../ExecutionGraphMetricsTest.java              |   7 +-
 .../runtime/metrics/MetricRegistryTest.java     |  96 +++++++-
 .../groups/MetricGroupRegistrationTest.java     |   3 +-
 .../util/DelegatingConfigurationTest.java       |  93 --------
 .../connectors/kafka/KafkaTestBase.java         |   3 +-
 19 files changed, 643 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/docs/apis/metrics.md
----------------------------------------------------------------------
diff --git a/docs/apis/metrics.md b/docs/apis/metrics.md
index e8c2772..af6ef64 100644
--- a/docs/apis/metrics.md
+++ b/docs/apis/metrics.md
@@ -227,14 +227,29 @@ or by assigning unique names to jobs and operators.
 
 ## Reporter
 
-Metrics can be exposed to an external system by configuring a reporter in 
`conf/flink-conf.yaml`.
-
-- `metrics.reporter.class`: The class of the reporter to use.
-  - Example: org.apache.flink.metrics.jmx.JMXReporter
-- `metrics.reporter.arguments`: A list of named parameters that are passed to 
the reporter.
-  - Example: --host localhost --port 9010
-- `metrics.reporter.interval`: The interval between reports.
-  - Example: 10 SECONDS
+Metrics can be exposed to an external system by configuring one or several 
reporters in `conf/flink-conf.yaml`.
+
+- `metrics.reporters`: The list of named reporters.
+- `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the 
reporter named `<name>`.
+- `metrics.reporter.<name>.class`: The reporter class to use for the reporter 
named `<name>`.
+- `metrics.reporter.<name>.interval`: The reporter interval to use for the 
reporter named `<name>`.
+
+All reporters must at least have the `class` property, some allow specifying a 
reporting `interval`. Below,
+we will list more settings specific to each reporter.
+
+Example reporter configuration that specifies multiple reporters:
+
+```
+metrics.reporters: my_jmx_reporter,my_other_reporter
+
+metrics.reporter.my_jmx_reporter.class: 
org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.my_jmx_reporter.port: 9020-9040
+
+metrics.reporter.my_other_reporter.class: 
org.apache.flink.metrics.graphite.GraphiteReporter
+metrics.reporter.my_other_reporter.host: 192.168.1.1
+metrics.reporter.my_other_reporter.port: 10000
+
+```
 
 You can write your own `Reporter` by implementing the 
`org.apache.flink.metrics.reporter.MetricReporter` interface.
 If the Reporter should send out reports regularly you have to implement the 
`Scheduled` interface as well.

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 616173d..1ce6d56 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -150,7 +150,7 @@ Default value is 1.
 - `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used 
if the default restart strategy is set to "fixed-delay".
 Default value is the `akka.ask.timeout`.
 
-- `restart-strategy.failure-rate.max-failures-per-interval`: Maximum number of 
restarts in given time interval before failing a job in "failure-rate" 
strategy. 
+- `restart-strategy.failure-rate.max-failures-per-interval`: Maximum number of 
restarts in given time interval before failing a job in "failure-rate" strategy.
 Default value is 1.
 
 - `restart-strategy.failure-rate.failure-rate-interval`: Time interval for 
measuring failure rate in "failure-rate" strategy.
@@ -288,7 +288,7 @@ of the JobManager, because the same ActorSystem is used. 
Its not possible to use
 
 - `recovery.zookeeper.path.root`: (Default '/flink') Defines the root dir 
under which the ZooKeeper recovery mode will create namespace directories.
 
-- `recovery.zookeeper.path.namespace`: (Default '/default_ns' in standalone 
mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under 
the root dir where the ZooKeeper recovery mode will create znodes. This allows 
to isolate multiple applications on the same ZooKeeper. 
+- `recovery.zookeeper.path.namespace`: (Default '/default_ns' in standalone 
mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under 
the root dir where the ZooKeeper recovery mode will create znodes. This allows 
to isolate multiple applications on the same ZooKeeper.
 
 - `recovery.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode 
of the leader latch which is used to elect the leader.
 
@@ -312,7 +312,13 @@ of the JobManager, because the same ActorSystem is used. 
Its not possible to use
 
 ## Metrics
 
-- `metrics.jmx.port`: (Default: 9010-9025) Defines the port used by JMX.
+- `metrics.reporters`: The list of named reporters, i.e. "foo,bar".
+
+- `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the 
reporter named `<name>`.
+
+- `metrics.reporter.<name>.class`: The reporter class to use for the reporter 
named `<name>`.
+
+- `metrics.reporter.<name>.interval`: The reporter interval to use for the 
reporter named `<name>`.
 
 - `metrics.scope.jm`: (Default: &lt;host&gt;.jobmanager) Defines the scope 
format string that is applied to all metrics scoped to a JobManager.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 028732a..5deed4e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -668,14 +668,34 @@ public final class ConfigConstants {
 
        // ---------------------------- Metrics 
-----------------------------------
 
-       /** The class of the reporter to use. */
-       public static final String METRICS_REPORTER_CLASS = 
"metrics.reporter.class";
-       
-       /** A list of named parameters that are passed to the reporter. */
-       public static final String METRICS_REPORTER_ARGUMENTS = 
"metrics.reporter.arguments";
+       /**
+        * The list of named reporters. Names are defined here and per-reporter 
configs
+        * are given with the reporter config prefix and the reporter name.
+        *
+        * Example:
+        * <pre>{@code
+        * metrics.reporters = foo, bar
+        *
+        * metrics.reporter.foo.class = 
org.apache.flink.metrics.reporter.JMXReporter
+        * metrics.reporter.foo.interval = 10
+        *
+        * metrics.reporter.bar.class = 
org.apache.flink.metrics.graphite.GraphiteReporter
+        * metrics.reporter.bar.port = 1337
+        * }</pre>
+        */
+       public static final String METRICS_REPORTERS_LIST = "metrics.reporters";
+
+       /**
+        * The prefix for per-reporter configs. Has to be combined with a 
reporter name and
+        * the configs mentioned below.
+        */
+       public static final String METRICS_REPORTER_PREFIX = 
"metrics.reporter.";
+
+       /** The class of the reporter to use. This is used as a suffix in an 
actual reporter config */
+       public static final String METRICS_REPORTER_CLASS_SUFFIX = "class";
        
-       /** The interval between reports. */
-       public static final String METRICS_REPORTER_INTERVAL = 
"metrics.reporter.interval";
+       /** The interval between reports. This is used as a suffix in an actual 
reporter config */
+       public static final String METRICS_REPORTER_INTERVAL_SUFFIX = 
"interval";
 
        /** The delimiter used to assemble the metric identifier. */
        public static final String METRICS_SCOPE_DELIMITER = 
"metrics.scope.delimiter";

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java 
b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 6b05053..8ca5d07 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.flink.annotation.Public;
@@ -56,7 +57,7 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
        
 
        /** Stores the concrete key/value pairs of this configuration object. */
-       private final HashMap<String, Object> confData;
+       protected final HashMap<String, Object> confData;
        
        // 
--------------------------------------------------------------------------------------------
 
@@ -420,6 +421,17 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                }
        }
 
+       /**
+        * Adds all entries in this {@code Configuration} to the given {@link 
Properties}.
+        */
+       public void addAllToProperties(Properties props) {
+               synchronized (this.confData) {
+                       for (Map.Entry<String, Object> entry : 
this.confData.entrySet()) {
+                               props.put(entry.getKey(), entry.getValue());
+                       }
+               }
+       }
+
        public void addAll(Configuration other) {
                synchronized (this.confData) {
                        synchronized (other.confData) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
new file mode 100644
index 0000000..91f81d4
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.configuration;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * A configuration that manages a subset of keys with a common prefix from a 
given configuration.
+ */
+public final class DelegatingConfiguration extends Configuration {
+
+       private static final long serialVersionUID = 1L;
+
+       private final Configuration backingConfig;              // the 
configuration actually storing the data
+
+       private String prefix;                                                  
// the prefix key by which keys for this config are marked
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Default constructor for serialization. Creates an empty delegating 
configuration.
+        */
+       public DelegatingConfiguration() {
+               this.backingConfig = new Configuration();
+               this.prefix = "";
+       }
+
+       /**
+        * Creates a new delegating configuration which stores its key/value 
pairs in the given
+        * configuration using the specifies key prefix.
+        *
+        * @param backingConfig The configuration holding the actual config 
data.
+        * @param prefix The prefix prepended to all config keys.
+        */
+       public DelegatingConfiguration(Configuration backingConfig, String 
prefix)
+       {
+               this.backingConfig = backingConfig;
+               this.prefix = prefix;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public String getString(String key, String defaultValue) {
+               return this.backingConfig.getString(this.prefix + key, 
defaultValue);
+       }
+
+       @Override
+       public void setString(String key, String value) {
+               this.backingConfig.setString(this.prefix + key, value);
+       }
+
+       @Override
+       public <T> Class<T> getClass(String key, Class<? extends T> 
defaultValue, ClassLoader classLoader) throws ClassNotFoundException {
+               return this.backingConfig.getClass(this.prefix + key, 
defaultValue, classLoader);
+       }
+
+       @Override
+       public void setClass(String key, Class<?> klazz) {
+               this.backingConfig.setClass(this.prefix + key, klazz);
+       }
+
+       @Override
+       public int getInteger(String key, int defaultValue) {
+               return this.backingConfig.getInteger(this.prefix + key, 
defaultValue);
+       }
+
+       @Override
+       public void setInteger(String key, int value) {
+               this.backingConfig.setInteger(this.prefix + key, value);
+       }
+
+       @Override
+       public long getLong(String key, long defaultValue) {
+               return this.backingConfig.getLong(this.prefix + key, 
defaultValue);
+       }
+
+       @Override
+       public void setLong(String key, long value) {
+               this.backingConfig.setLong(this.prefix + key, value);
+       }
+
+       @Override
+       public boolean getBoolean(String key, boolean defaultValue) {
+               return this.backingConfig.getBoolean(this.prefix + key, 
defaultValue);
+       }
+
+       @Override
+       public void setBoolean(String key, boolean value) {
+               this.backingConfig.setBoolean(this.prefix + key, value);
+       }
+
+       @Override
+       public float getFloat(String key, float defaultValue) {
+               return this.backingConfig.getFloat(this.prefix + key, 
defaultValue);
+       }
+
+       @Override
+       public void setFloat(String key, float value) {
+               this.backingConfig.setFloat(this.prefix + key, value);
+       }
+
+       @Override
+       public double getDouble(String key, double defaultValue) {
+               return this.backingConfig.getDouble(this.prefix + key, 
defaultValue);
+       }
+
+       @Override
+       public void setDouble(String key, double value) {
+               this.backingConfig.setDouble(this.prefix + key, value);
+       }
+
+       @Override
+       public byte[] getBytes(final String key, final byte[] defaultValue) {
+               return this.backingConfig.getBytes(this.prefix + key, 
defaultValue);
+       }
+
+       @Override
+       public void setBytes(final String key, final byte[] bytes) {
+               this.backingConfig.setBytes(this.prefix + key, bytes);
+       }
+
+       @Override
+       public void addAllToProperties(Properties props) {
+               // only add keys with our prefix
+               synchronized (backingConfig.confData) {
+                       for (Map.Entry<String, Object> entry : 
backingConfig.confData.entrySet()) {
+                               if (entry.getKey().startsWith(prefix)) {
+                                       String keyWithoutPrefix =
+                                                       
entry.getKey().substring(prefix.length(),
+                                                                       
entry.getKey().length());
+
+                                       props.put(keyWithoutPrefix, 
entry.getValue());
+                               } else {
+                                       // don't add stuff that doesn't have 
our prefix
+                               }
+                       }
+               }
+
+       }
+
+       @Override
+       public void addAll(Configuration other) {
+               this.addAll(other, "");
+       }
+
+       @Override
+       public void addAll(Configuration other, String prefix) {
+               this.backingConfig.addAll(other, this.prefix + prefix);
+       }
+
+       @Override
+       public String toString() {
+               return backingConfig.toString();
+       }
+
+       @Override
+       public Set<String> keySet() {
+               final HashSet<String> set = new HashSet<String>();
+               final int prefixLen = this.prefix == null ? 0 : 
this.prefix.length();
+
+               for (String key : this.backingConfig.keySet()) {
+                       if (key.startsWith(this.prefix)) {
+                               set.add(key.substring(prefixLen));
+                       }
+               }
+               return set;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.prefix = in.readUTF();
+               this.backingConfig.read(in);
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeUTF(this.prefix);
+               this.backingConfig.write(out);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return this.prefix.hashCode() ^ this.backingConfig.hashCode();
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof DelegatingConfiguration) {
+                       DelegatingConfiguration other = 
(DelegatingConfiguration) obj;
+                       return this.prefix.equals(other.prefix) && 
this.backingConfig.equals(other.backingConfig);
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
index 5514caf..24ad61e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
@@ -20,6 +20,8 @@ package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.Public;
 
+import java.util.Properties;
+
 /**
  * Unmodifiable version of the Configuration class.
  */
@@ -42,6 +44,13 @@ public class UnmodifiableConfiguration extends Configuration 
{
        //  All mutating methods must fail
        // 
--------------------------------------------------------------------------------------------
 
+
+       @Override
+       public void addAllToProperties(Properties props) {
+               // override to make the UnmodifiableConfigurationTest happy
+               super.addAllToProperties(props);
+       }
+
        @Override
        public final void addAll(Configuration other) {
                error();

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
new file mode 100644
index 0000000..6d42129
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.configuration;
+
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class DelegatingConfigurationTest {
+
+       /**
+        * 
http://stackoverflow.com/questions/22225663/checking-in-a-unit-test-whether-all-methods-are-delegated
+        */
+       @Test
+       public void testIfDelegatesImplementAllMethods() throws 
IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+
+               Comparator<Method> methodComparator = new Comparator<Method>() {
+                       @Override
+                       public int compare(Method o1, Method o2) {
+                               String o1Str = o1.getName() + 
typeParamToString(o1.getParameterTypes());
+                               String o2Str = o2.getName() + 
typeParamToString(o2.getParameterTypes());
+                               return o1Str.compareTo( o2Str ); 
+                       }
+
+                       private String typeParamToString(Class<?>[] classes) {
+                               String str = "";
+                               for(Object t : classes) {
+                                       str += t.toString();
+                               }
+                               return str;
+                       }
+               };
+               
+               // For each method in the Configuration class...
+               Method[] confMethods = Configuration.class.getDeclaredMethods();
+               Method[] delegateMethods = 
DelegatingConfiguration.class.getDeclaredMethods();
+               Arrays.sort(confMethods, methodComparator);
+               Arrays.sort(delegateMethods, methodComparator);
+               match : for (Method configurationMethod : confMethods) {
+                       boolean hasMethod = false;
+                       
if(!Modifier.isPublic(configurationMethod.getModifiers()) ) {
+                               continue;
+                       }
+                       // Find matching method in wrapper class and call it
+                       mismatch: for (Method wrapperMethod : delegateMethods) {
+                               if 
(configurationMethod.getName().equals(wrapperMethod.getName())) {
+                                       
+                                       // Get parameters for method
+                                       Class<?>[] wrapperMethodParams = 
wrapperMethod.getParameterTypes();
+                                       Class<?>[] configMethodParams = 
configurationMethod.getParameterTypes();
+                                       if(wrapperMethodParams.length != 
configMethodParams.length) {
+                                               System.err.println("Length");
+                                               break mismatch;
+                                       }
+                                       for(int i = 0; i < 
wrapperMethodParams.length; i++) {
+                                               if(wrapperMethodParams[i] != 
configMethodParams[i]) {
+                                                       break mismatch;
+                                               }
+                                       }
+                                       hasMethod = true;
+                                       break match;
+                               }
+                       }
+                       assertTrue("Foo method '" + 
configurationMethod.getName() + "' has not been wrapped correctly in 
DelegatingConfiguration wrapper", hasMethod);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 9b04c3b..5979c43 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -31,10 +31,10 @@ import 
org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.util.AbstractID;
-
 import org.junit.Test;
 
-import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertTrue;
 public class ScheduledDropwizardReporterTest {
 
        @Test
-       public void testInvalidCharacterReplacement() {
+       public void testInvalidCharacterReplacement() throws 
NoSuchMethodException, InvocationTargetException, IllegalAccessException {
                ScheduledDropwizardReporter reporter = new 
ScheduledDropwizardReporter() {
                        @Override
                        public ScheduledReporter getReporter(MetricConfig 
config) {
@@ -68,7 +68,11 @@ public class ScheduledDropwizardReporterTest {
                String taskManagerId = "tas:kMana::ger";
                String counterName = "testCounter";
 
-               configuration.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");
+               configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
+               configuration.setString(
+                               ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
+                               
"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");
+
                
configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
"<host>.<tm_id>.<job_name>");
                
configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
 
@@ -84,10 +88,10 @@ public class ScheduledDropwizardReporterTest {
 
                taskMetricGroup.counter(counterName, myCounter);
 
-               Field reporterField = 
MetricRegistry.class.getDeclaredField("reporter");
-               reporterField.setAccessible(true);
+               List<MetricReporter> reporters = metricRegistry.getReporters();
 
-               MetricReporter metricReporter = (MetricReporter) 
reporterField.get(metricRegistry);
+               assertTrue(reporters.size() == 1);
+               MetricReporter metricReporter = reporters.get(0);
 
                assertTrue("Reporter should be of type 
ScheduledDropwizardReporter", metricReporter instanceof 
ScheduledDropwizardReporter);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index 7b27867..9c2ce41 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -96,8 +96,9 @@ public class DropwizardFlinkHistogramWrapperTest extends 
TestLogger {
                int size = 10;
                String histogramMetricName = "histogram";
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestingReporter.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_INTERVAL, 
reportingInterval + " MILLISECONDS");
+               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"my_reporter");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestingReporter.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, 
reportingInterval + " MILLISECONDS");
 
                MetricRegistry registry = null;
 
@@ -112,10 +113,9 @@ public class DropwizardFlinkHistogramWrapperTest extends 
TestLogger {
 
                        String fullMetricName = 
metricGroup.getMetricIdentifier(histogramMetricName);
 
-                       Field f = 
registry.getClass().getDeclaredField("reporter");
-                       f.setAccessible(true);
+                       assertTrue(registry.getReporters().size() == 1);
 
-                       MetricReporter reporter = (MetricReporter) 
f.get(registry);
+                       MetricReporter reporter = 
registry.getReporters().get(0);
 
                        assertTrue(reporter instanceof TestingReporter);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index efb38b2..14ba5ec 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -23,9 +23,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
-import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestReporter;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -38,8 +39,10 @@ import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
 import java.lang.management.ManagementFactory;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class JMXReporterTest extends TestLogger {
 
@@ -79,18 +82,24 @@ public class JMXReporterTest extends TestLogger {
        @Test
        public void testPortConflictHandling() throws Exception {
                Configuration cfg = new Configuration();
+               cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2");
+
+               cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
+               cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1.port", "9020-9035");
+
+               cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
+               cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2.port", "9020-9035");
+
                MetricRegistry reg = new MetricRegistry(cfg);
 
                TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, 
"host", "tm");
 
-               JMXReporter rep1 = new JMXReporter();
-               JMXReporter rep2 = new JMXReporter();
+               List<MetricReporter> reporters = reg.getReporters();
 
-               MetricConfig cfg1 = new MetricConfig();
-               cfg1.setProperty("port", "9020-9035");
+               assertTrue(reporters.size() == 2);
 
-               rep1.open(cfg1);
-               rep2.open(cfg1);
+               MetricReporter rep1 = reporters.get(0);
+               MetricReporter rep2 = reporters.get(1);
 
                rep1.notifyOfAddedMetric(new Gauge<Integer>() {
                        @Override
@@ -114,8 +123,6 @@ public class JMXReporterTest extends TestLogger {
                assertEquals(1, mBeanServer.getAttribute(objectName1, "Value"));
                assertEquals(2, mBeanServer.getAttribute(objectName2, "Value"));
 
-               rep1.close();
-               rep2.close();
                reg.shutdown();
        }
 
@@ -127,17 +134,26 @@ public class JMXReporterTest extends TestLogger {
        @Test
        public void testJMXAvailability() throws Exception {
                Configuration cfg = new Configuration();
+               cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+
+               cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2");
+
+               cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
+               cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1.port", "9040-9055");
+
+               cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
+               cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2.port", "9040-9055");
+
                MetricRegistry reg = new MetricRegistry(cfg);
 
                TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, 
"host", "tm");
 
-               JMXReporter rep1 = new JMXReporter();
-               JMXReporter rep2 = new JMXReporter();
+               List<MetricReporter> reporters = reg.getReporters();
+
+               assertTrue(reporters.size() == 2);
 
-               MetricConfig cfg1 = new MetricConfig();
-               cfg1.setProperty("port", "9040-9055");
-               rep1.open(cfg1);
-               rep2.open(cfg1);
+               MetricReporter rep1 = reporters.get(0);
+               MetricReporter rep2 = reporters.get(1);
 
                rep1.notifyOfAddedMetric(new Gauge<Integer>() {
                        @Override
@@ -156,29 +172,23 @@ public class JMXReporterTest extends TestLogger {
                ObjectName objectName1 = new 
ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
                ObjectName objectName2 = new 
ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
 
-               JMXServiceURL url1 = new 
JMXServiceURL("service:jmx:rmi://localhost:" + rep1.getPort() + 
"/jndi/rmi://localhost:" + rep1.getPort() + "/jmxrmi");
+               JMXServiceURL url1 = new 
JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter)rep1).getPort() + 
"/jndi/rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jmxrmi");
                JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
                MBeanServerConnection mCon1 = 
jmxCon1.getMBeanServerConnection();
 
                assertEquals(1, mCon1.getAttribute(objectName1, "Value"));
                assertEquals(2, mCon1.getAttribute(objectName2, "Value"));
 
-               url1 = null;
                jmxCon1.close();
-               jmxCon1 = null;
-               mCon1 = null;
 
-               JMXServiceURL url2 = new 
JMXServiceURL("service:jmx:rmi://localhost:" + rep2.getPort() + 
"/jndi/rmi://localhost:" + rep2.getPort() + "/jmxrmi");
+               JMXServiceURL url2 = new 
JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter)rep2).getPort() + 
"/jndi/rmi://localhost:" + ((JMXReporter)rep2).getPort() + "/jmxrmi");
                JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
                MBeanServerConnection mCon2 = 
jmxCon2.getMBeanServerConnection();
 
                assertEquals(1, mCon2.getAttribute(objectName1, "Value"));
                assertEquals(2, mCon2.getAttribute(objectName2, "Value"));
 
-               url2 = null;
                jmxCon2.close();
-               jmxCon2 = null;
-               mCon2 = null;
 
                rep1.close();
                rep2.close();
@@ -195,7 +205,8 @@ public class JMXReporterTest extends TestLogger {
 
                try {
                        Configuration config = new Configuration();
-                       
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
JMXReporter.class.getName());
+                       
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test");
+                       
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
                        registry = new MetricRegistry(config);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 8b0f672..fa1f9f8 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -19,12 +19,12 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.junit.Test;
@@ -52,9 +52,11 @@ public class JMXJobManagerMetricTest {
                Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
                Configuration flinkConfiguration = new Configuration();
 
-               
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
JMXReporter.class.getName());
+               
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+               
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." 
+ ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
+               
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.port", "9060-9075");
+
                
flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, 
"jobmanager.<job_name>");
-               
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, 
"--port 9060-9075");
 
                TestingCluster flink = new TestingCluster(flinkConfiguration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index d7ae04f..8c1af0e 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -36,12 +36,12 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.SocketException;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -73,7 +73,11 @@ public class StatsDReporterTest extends TestLogger {
                String taskManagerId = "tas:kMana::ger";
                String counterName = "testCounter";
 
-               configuration.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
"org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter");
+               configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
+               configuration.setString(
+                               ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
+                               
"org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter");
+
                
configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
"<host>.<tm_id>.<job_name>");
                
configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
 
@@ -89,10 +93,11 @@ public class StatsDReporterTest extends TestLogger {
 
                taskMetricGroup.counter(counterName, myCounter);
 
-               Field reporterField = 
MetricRegistry.class.getDeclaredField("reporter");
-               reporterField.setAccessible(true);
+               List<MetricReporter> reporters = metricRegistry.getReporters();
+
+               assertTrue(reporters.size() == 1);
 
-               MetricReporter metricReporter = (MetricReporter) 
reporterField.get(metricRegistry);
+               MetricReporter metricReporter = reporters.get(0);
 
                assertTrue("Reporter should be of type StatsDReporter", 
metricReporter instanceof StatsDReporter);
 
@@ -138,9 +143,11 @@ public class StatsDReporterTest extends TestLogger {
                        int port = receiver.getPort();
 
                        Configuration config = new Configuration();
-                       
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
StatsDReporter.class.getName());
-                       
config.setString(ConfigConstants.METRICS_REPORTER_INTERVAL, "1 SECONDS");
-                       
config.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "--host localhost 
--port " + port);
+                       
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+                       
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
+                       
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
+                       
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", 
"localhost");
+                       
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "" + 
port);
 
                        registry = new MetricRegistry(config);
 
@@ -308,6 +315,7 @@ public class StatsDReporterTest extends TestLogger {
                                        byte[] buffer = new byte[1024];
 
                                        DatagramPacket packet = new 
DatagramPacket(buffer, buffer.length);
+
                                        socket.receive(packet);
 
                                        String line = new 
String(packet.getData(), 0, packet.getLength());

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 9c4da6c..65b5b9b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
@@ -30,6 +31,8 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.TimerTask;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -41,9 +44,9 @@ import java.util.concurrent.TimeUnit;
  */
 public class MetricRegistry {
        static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
-
-       private final MetricReporter reporter;
-       private final ScheduledExecutorService executor;
+       
+       private List<MetricReporter> reporters;
+       private ScheduledExecutorService executor;
 
        private final ScopeFormats scopeFormats;
 
@@ -74,56 +77,68 @@ public class MetricRegistry {
                this.delimiter = delim;
 
                // second, instantiate any custom configured reporters
+               this.reporters = new ArrayList<>();
+
+               final String definedReporters = 
config.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);
 
-               final String className = 
config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null);
-               if (className == null) {
+               if (definedReporters == null) {
+                       // no reporters defined
                        // by default, don't report anything
                        LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
-                       this.reporter = null;
                        this.executor = null;
-               }
-               else {
-                       MetricReporter reporter;
-                       ScheduledExecutorService executor = null;
-                       try {
-                               String configuredPeriod = 
config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null);
-                               TimeUnit timeunit = TimeUnit.SECONDS;
-                               long period = 10;
-
-                               if (configuredPeriod != null) {
-                                       try {
-                                               String[] interval = 
configuredPeriod.split(" ");
-                                               period = 
Long.parseLong(interval[0]);
-                                               timeunit = 
TimeUnit.valueOf(interval[1]);
-                                       }
-                                       catch (Exception e) {
-                                               LOG.error("Cannot parse report 
interval from config: " + configuredPeriod +
-                                                       " - please use values 
like '10 SECONDS' or '500 MILLISECONDS'. " +
-                                                       "Using default 
reporting interval.");
-                                       }
+               } else {
+                       // we have some reporters so
+                       String[] namedReporters = 
definedReporters.split("\\s*,\\s*");
+                       for (String namedReporter : namedReporters) {
+
+                               DelegatingConfiguration reporterConfig = new 
DelegatingConfiguration(config, ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + ".");
+                               final String className = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
+                               if (className == null) {
+                                       LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
+                                       continue;
                                }
 
-                               MetricConfig reporterConfig = 
createReporterConfig(config);
+                               try {
+                                       String configuredPeriod = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, 
null);
+                                       TimeUnit timeunit = TimeUnit.SECONDS;
+                                       long period = 10;
+
+                                       if (configuredPeriod != null) {
+                                               try {
+                                                       String[] interval = 
configuredPeriod.split(" ");
+                                                       period = 
Long.parseLong(interval[0]);
+                                                       timeunit = 
TimeUnit.valueOf(interval[1]);
+                                               }
+                                               catch (Exception e) {
+                                                       LOG.error("Cannot parse 
report interval from config: " + configuredPeriod +
+                                                                       " - 
please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
+                                                                       "Using 
default reporting interval.");
+                                               }
+                                       }
 
-                               Class<?> reporterClass = 
Class.forName(className);
-                               reporter = (MetricReporter) 
reporterClass.newInstance();
-                               reporter.open(reporterConfig);
+                                       Class<?> reporterClass = 
Class.forName(className);
+                                       MetricReporter reporterInstance = 
(MetricReporter) reporterClass.newInstance();
 
-                               if (reporter instanceof Scheduled) {
-                                       executor = 
Executors.newSingleThreadScheduledExecutor();
-                                       LOG.info("Periodically reporting 
metrics in intervals of {} {}", period, timeunit.name());
+                                       MetricConfig metricConfig = new 
MetricConfig();
+                                       
reporterConfig.addAllToProperties(metricConfig);
+                                       reporterInstance.open(metricConfig);
 
-                                       executor.scheduleWithFixedDelay(new 
ReporterTask((Scheduled) reporter), period, period, timeunit);
+                                       if (reporterInstance instanceof 
Scheduled) {
+                                               if (this.executor == null) {
+                                                       executor = 
Executors.newSingleThreadScheduledExecutor();
+                                               }
+                                               LOG.info("Periodically 
reporting metrics in intervals of {} {} for reporter {} of type {}.", period, 
timeunit.name(), namedReporter, className);
+
+                                               executor.scheduleWithFixedDelay(
+                                                               new 
ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
+                                       }
+                                       reporters.add(reporterInstance);
+                               }
+                               catch (Throwable t) {
+                                       shutdownExecutor();
+                                       LOG.error("Could not instantiate 
metrics reporter" + namedReporter + ". Metrics might not be exposed/reported.", 
t);
                                }
                        }
-                       catch (Throwable t) {
-                               shutdownExecutor();
-                               LOG.info("Could not instantiate metrics 
reporter. No metrics will be exposed/reported.", t);
-                               reporter = null;
-                       }
-
-                       this.reporter = reporter;
-                       this.executor = executor;
                }
        }
 
@@ -131,24 +146,27 @@ public class MetricRegistry {
                return this.delimiter;
        }
 
-       public MetricReporter getReporter() {
-               return reporter;
+       public List<MetricReporter> getReporters() {
+               return reporters;
        }
 
        /**
         * Shuts down this registry and the associated {@link 
org.apache.flink.metrics.reporter.MetricReporter}.
         */
        public void shutdown() {
-               if (reporter != null) {
-                       try {
-                               reporter.close();
-                       } catch (Throwable t) {
-                               LOG.warn("Metrics reporter did not shut down 
cleanly", t);
+               if (reporters != null) {
+                       for (MetricReporter reporter : reporters) {
+                               try {
+                                       reporter.close();
+                               } catch (Throwable t) {
+                                       LOG.warn("Metrics reporter did not shut 
down cleanly", t);
+                               }
                        }
+                       reporters = null;
                }
                shutdownExecutor();
        }
-
+       
        private void shutdownExecutor() {
                if (executor != null) {
                        executor.shutdown();
@@ -180,8 +198,12 @@ public class MetricRegistry {
         */
        public void register(Metric metric, String metricName, MetricGroup 
group) {
                try {
-                       if (reporter != null) {
-                               reporter.notifyOfAddedMetric(metric, 
metricName, group);
+                       if (reporters != null) {
+                               for (MetricReporter reporter : reporters) {
+                                       if (reporter != null) {
+                                               
reporter.notifyOfAddedMetric(metric, metricName, group);
+                                       }
+                               }
                        }
                } catch (Exception e) {
                        LOG.error("Error while registering metric.", e);
@@ -197,8 +219,12 @@ public class MetricRegistry {
         */
        public void unregister(Metric metric, String metricName, MetricGroup 
group) {
                try {
-                       if (reporter != null) {
-                               reporter.notifyOfRemovedMetric(metric, 
metricName, group);
+                       if (reporters != null) {
+                               for (MetricReporter reporter : reporters) {
+                                       if (reporter != null) {
+                                               
reporter.notifyOfRemovedMetric(metric, metricName, group);
+                                       }
+                               }
                        }
                } catch (Exception e) {
                        LOG.error("Error while registering metric.", e);
@@ -208,17 +234,6 @@ public class MetricRegistry {
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
-       static MetricConfig createReporterConfig(Configuration config) {
-               MetricConfig reporterConfig = new MetricConfig();
-
-               String[] arguments = 
config.getString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "").split(" ");
-               if (arguments.length > 1) {
-                       for (int x = 0; x < arguments.length; x += 2) {
-                               
reporterConfig.setProperty(arguments[x].replace("--", ""), arguments[x + 1]);
-                       }
-               }
-               return reporterConfig;
-       }
 
        static ScopeFormats createScopeConfig(Configuration config) {
                String jmFormat = config.getString(

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index 03a8910..b598523 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -25,9 +25,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
@@ -40,9 +38,8 @@ import 
org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.Driver;
@@ -1162,184 +1159,5 @@ public class TaskConfig implements Serializable {
        public boolean isSolutionSetUnmanaged() {
                return config.getBoolean(SOLUTION_SET_OBJECTS, false);
        }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                          Utility class for nested Configurations
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * A configuration that manages a subset of keys with a common prefix 
from a given configuration.
-        */
-       public static final class DelegatingConfiguration extends Configuration 
{
-               
-               private static final long serialVersionUID = 1L;
-
-               private final Configuration backingConfig;              // the 
configuration actually storing the data
-               
-               private String prefix;                                          
        // the prefix key by which keys for this config are marked
-               
-               // 
--------------------------------------------------------------------------------------------
-               
-               /**
-                * Default constructor for serialization. Creates an empty 
delegating configuration.
-                */
-               public DelegatingConfiguration() {
-                       this.backingConfig = new Configuration();
-                       this.prefix = "";
-               }
-               
-               /**
-                * Creates a new delegating configuration which stores its 
key/value pairs in the given
-                * configuration using the specifies key prefix.
-                * 
-                * @param backingConfig The configuration holding the actual 
config data.
-                * @param prefix The prefix prepended to all config keys.
-                */
-               public DelegatingConfiguration(Configuration backingConfig, 
String prefix)
-               {
-                       this.backingConfig = backingConfig;
-                       this.prefix = prefix;
-               }
-
-               // 
--------------------------------------------------------------------------------------------
-               
-               @Override
-               public String getString(String key, String defaultValue) {
-                       return this.backingConfig.getString(this.prefix + key, 
defaultValue);
-               }
-               
-               @Override
-               public void setString(String key, String value) {
-                       this.backingConfig.setString(this.prefix + key, value);
-               }
-
-               @Override
-               public <T> Class<T> getClass(String key, Class<? extends T> 
defaultValue, ClassLoader classLoader) throws ClassNotFoundException {
-                       return this.backingConfig.getClass(this.prefix + key, 
defaultValue, classLoader);
-               }
-
-               @Override
-               public void setClass(String key, Class<?> klazz) {
-                       this.backingConfig.setClass(this.prefix + key, klazz);
-               }
-
-               @Override
-               public int getInteger(String key, int defaultValue) {
-                       return this.backingConfig.getInteger(this.prefix + key, 
defaultValue);
-               }
-
-               @Override
-               public void setInteger(String key, int value) {
-                       this.backingConfig.setInteger(this.prefix + key, value);
-               }
-
-               @Override
-               public long getLong(String key, long defaultValue) {
-                       return this.backingConfig.getLong(this.prefix + key, 
defaultValue);
-               }
-
-               @Override
-               public void setLong(String key, long value) {
-                       this.backingConfig.setLong(this.prefix + key, value);
-               }
-
-               @Override
-               public boolean getBoolean(String key, boolean defaultValue) {
-                       return this.backingConfig.getBoolean(this.prefix + key, 
defaultValue);
-               }
-
-               @Override
-               public void setBoolean(String key, boolean value) {
-                       this.backingConfig.setBoolean(this.prefix + key, value);
-               }
-
-               @Override
-               public float getFloat(String key, float defaultValue) {
-                       return this.backingConfig.getFloat(this.prefix + key, 
defaultValue);
-               }
-
-               @Override
-               public void setFloat(String key, float value) {
-                       this.backingConfig.setFloat(this.prefix + key, value);
-               }
-
-               @Override
-               public double getDouble(String key, double defaultValue) {
-                       return this.backingConfig.getDouble(this.prefix + key, 
defaultValue);
-               }
-
-               @Override
-               public void setDouble(String key, double value) {
-                       this.backingConfig.setDouble(this.prefix + key, value);
-               }
-               
-               @Override
-               public byte[] getBytes(final String key, final byte[] 
defaultValue) {
-                       return this.backingConfig.getBytes(this.prefix + key, 
defaultValue);
-               }
-               
-               @Override
-               public void setBytes(final String key, final byte[] bytes) {
-                       this.backingConfig.setBytes(this.prefix + key, bytes);
-               }
-               
-               @Override
-               public void addAll(Configuration other) {
-                       this.addAll(other, "");
-               }
-               
-               @Override
-               public void addAll(Configuration other, String prefix) {
-                       this.backingConfig.addAll(other, this.prefix + prefix);
-               }
-               
-               @Override
-               public String toString() {
-                       return backingConfig.toString();
-               }
-               
-               @Override
-               public Set<String> keySet() {
-                       final HashSet<String> set = new HashSet<String>();
-                       final int prefixLen = this.prefix == null ? 0 : 
this.prefix.length();
-                       
-                       for (String key : this.backingConfig.keySet()) {
-                               if (key.startsWith(this.prefix)) {
-                                       set.add(key.substring(prefixLen));
-                               }
-                       }
-                       return set;
-               }
-               
-               // 
--------------------------------------------------------------------------------------------
-
-               @Override
-               public void read(DataInputView in) throws IOException {
-                       this.prefix = in.readUTF();
-                       this.backingConfig.read(in);
-               }
 
-               @Override
-               public void write(DataOutputView out) throws IOException {
-                       out.writeUTF(this.prefix);
-                       this.backingConfig.write(out);
-               }
-               
-               // 
--------------------------------------------------------------------------------------------
-
-               @Override
-               public int hashCode() {
-                       return this.prefix.hashCode() ^ 
this.backingConfig.hashCode();
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj instanceof DelegatingConfiguration) {
-                               DelegatingConfiguration other = 
(DelegatingConfiguration) obj;
-                               return this.prefix.equals(other.prefix) && 
this.backingConfig.equals(other.backingConfig);
-                       } else {
-                               return false;
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 630335b..2b8b867 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -85,7 +85,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
 
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestingReporter.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestingReporter.class.getName());
 
                Configuration jobConfig = new Configuration();
 
@@ -93,7 +94,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
                MetricRegistry metricRegistry = new MetricRegistry(config);
 
-               MetricReporter reporter = metricRegistry.getReporter();
+               assertTrue(metricRegistry.getReporters().size() == 1);
+
+               MetricReporter reporter = metricRegistry.getReporters().get(0);
 
                assertTrue(reporter instanceof TestingReporter);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index 3252e3d..d75ef57 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -45,9 +45,12 @@ public class MetricRegistryTest extends TestLogger {
        public void testReporterInstantiation() {
                Configuration config = new Configuration();
 
-               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter1.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
 
-               new MetricRegistry(config);
+               MetricRegistry metricRegistry = new MetricRegistry(config);
+
+               assertTrue(metricRegistry.getReporters().size() == 1);
 
                Assert.assertTrue(TestReporter1.wasOpened);
        }
@@ -62,14 +65,64 @@ public class MetricRegistryTest extends TestLogger {
        }
 
        /**
+        * Verifies that multiple reporters are instantiated correctly.
+        */
+       @Test
+       public void testMultipleReporterInstantiation() {
+               Configuration config = new Configuration();
+
+               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1, test2,test3");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter11.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter12.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter13.class.getName());
+
+               MetricRegistry metricRegistry = new MetricRegistry(config);
+
+               assertTrue(metricRegistry.getReporters().size() == 3);
+
+               Assert.assertTrue(TestReporter11.wasOpened);
+               Assert.assertTrue(TestReporter12.wasOpened);
+               Assert.assertTrue(TestReporter13.wasOpened);
+       }
+
+       protected static class TestReporter11 extends TestReporter {
+               public static boolean wasOpened = false;
+
+               @Override
+               public void open(MetricConfig config) {
+                       wasOpened = true;
+               }
+       }
+
+       protected static class TestReporter12 extends TestReporter {
+               public static boolean wasOpened = false;
+
+               @Override
+               public void open(MetricConfig config) {
+                       wasOpened = true;
+               }
+       }
+
+       protected static class TestReporter13 extends TestReporter {
+               public static boolean wasOpened = false;
+
+               @Override
+               public void open(MetricConfig config) {
+                       wasOpened = true;
+               }
+       }
+
+       /**
         * Verifies that configured arguments are properly forwarded to the 
reporter.
         */
        @Test
        public void testReporterArgumentForwarding() {
                Configuration config = new Configuration();
 
-               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter2.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, 
"--arg1 hello --arg2 world");
+               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter2.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg1", "hello");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg2", "world");
 
                new MetricRegistry(config);
        }
@@ -91,8 +144,10 @@ public class MetricRegistryTest extends TestLogger {
        public void testReporterScheduling() throws InterruptedException {
                Configuration config = new Configuration();
 
-               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter3.class.getName());
-               config.setString(ConfigConstants.METRICS_REPORTER_INTERVAL, "50 
MILLISECONDS");
+               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter3.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg1", "hello");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
 
                new MetricRegistry(config);
 
@@ -125,12 +180,14 @@ public class MetricRegistryTest extends TestLogger {
        }
 
        /**
-        * Verifies that reporters implementing the Listener interface are 
notified when Metrics are added or removed.
+        * Verifies that reporters are notified of added/removed metrics.
         */
        @Test
-       public void testListener() {
+       public void testReporterNotifications() {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter6.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter6.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter7.class.getName());
 
                MetricRegistry registry = new MetricRegistry(config);
 
@@ -140,6 +197,8 @@ public class MetricRegistryTest extends TestLogger {
 
                assertTrue(TestReporter6.addCalled);
                assertTrue(TestReporter6.removeCalled);
+               assertTrue(TestReporter7.addCalled);
+               assertTrue(TestReporter7.removeCalled);
        }
 
        protected static class TestReporter6 extends TestReporter {
@@ -161,6 +220,25 @@ public class MetricRegistryTest extends TestLogger {
                }
        }
 
+       protected static class TestReporter7 extends TestReporter {
+               public static boolean addCalled = false;
+               public static boolean removeCalled = false;
+
+               @Override
+               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       addCalled = true;
+                       assertTrue(metric instanceof Counter);
+                       assertEquals("rootCounter", metricName);
+               }
+
+               @Override
+               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       removeCalled = true;
+                       Assert.assertTrue(metric instanceof Counter);
+                       Assert.assertEquals("rootCounter", metricName);
+               }
+       }
+
        /**
         * Verifies that the scope configuration is properly extracted.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index 39485dc..9936631 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -40,7 +40,8 @@ public class MetricGroupRegistrationTest {
        @Test
        public void testMetricInstantiation() {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter1.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
 
                MetricRegistry registry = new MetricRegistry(config);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java
deleted file mode 100644
index 09812f2..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.util;
-
-import static org.junit.Assert.assertTrue;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.Comparator;
-
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.runtime.operators.util.TaskConfig.DelegatingConfiguration;
-import org.junit.Test;
-
-
-public class DelegatingConfigurationTest {
-
-       /**
-        * 
http://stackoverflow.com/questions/22225663/checking-in-a-unit-test-whether-all-methods-are-delegated
-        */
-       @Test
-       public void testIfDelegatesImplementAllMethods() throws 
IllegalAccessException, IllegalArgumentException, InvocationTargetException {
-
-               Comparator<Method> methodComparator = new Comparator<Method>() {
-                       @Override
-                       public int compare(Method o1, Method o2) {
-                               String o1Str = o1.getName() + 
typeParamToString(o1.getParameterTypes());
-                               String o2Str = o2.getName() + 
typeParamToString(o2.getParameterTypes());
-                               return o1Str.compareTo( o2Str ); 
-                       }
-
-                       private String typeParamToString(Class<?>[] classes) {
-                               String str = "";
-                               for(Object t : classes) {
-                                       str += t.toString();
-                               }
-                               return str;
-                       }
-               };
-               
-               // For each method in the Configuration class...
-               Method[] confMethods = Configuration.class.getDeclaredMethods();
-               Method[] delegateMethods = 
DelegatingConfiguration.class.getDeclaredMethods();
-               Arrays.sort(confMethods, methodComparator);
-               Arrays.sort(delegateMethods, methodComparator);
-               match : for (Method configurationMethod : confMethods) {
-                       boolean hasMethod = false;
-                       
if(!Modifier.isPublic(configurationMethod.getModifiers()) ) {
-                               continue;
-                       }
-                       // Find matching method in wrapper class and call it
-                       mismatch: for (Method wrapperMethod : delegateMethods) {
-                               if 
(configurationMethod.getName().equals(wrapperMethod.getName())) {
-                                       
-                                       // Get parameters for method
-                                       Class<?>[] wrapperMethodParams = 
wrapperMethod.getParameterTypes();
-                                       Class<?>[] configMethodParams = 
configurationMethod.getParameterTypes();
-                                       if(wrapperMethodParams.length != 
configMethodParams.length) {
-                                               System.err.println("Length");
-                                               break mismatch;
-                                       }
-                                       for(int i = 0; i < 
wrapperMethodParams.length; i++) {
-                                               if(wrapperMethodParams[i] != 
configMethodParams[i]) {
-                                                       break mismatch;
-                                               }
-                                       }
-                                       hasMethod = true;
-                                       break match;
-                               }
-                       }
-                       assertTrue("Foo method '" + 
configurationMethod.getName() + "' has not been wrapped correctly in 
DelegatingConfiguration wrapper", hasMethod);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 4450e94..eddb57c 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -102,7 +102,8 @@ public abstract class KafkaTestBase extends TestLogger {
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
                
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
-               flinkConfig.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
JMXReporter.class.getName());
+               flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"my_reporter");
+               flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
 
                flink = new ForkableFlinkMiniCluster(flinkConfig, false);
                flink.start();

Reply via email to