[FLINK-8080][metrics] metrics.reporters now optional include list

This closes #5099.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/493c2857
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/493c2857
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/493c2857

Branch: refs/heads/master
Commit: 493c28571f22d9dde4edbec0ba38f2761fb51335
Parents: 42b0114
Author: zentol <[email protected]>
Authored: Wed Nov 15 12:56:30 2017 +0100
Committer: zentol <[email protected]>
Committed: Tue Dec 12 19:09:15 2017 +0100

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  9 +---
 .../connectors/kafka/KafkaTestBase.java         |  2 -
 .../flink/configuration/MetricOptions.java      |  8 ++--
 .../ScheduledDropwizardReporterTest.java        |  1 -
 .../DropwizardFlinkHistogramWrapperTest.java    |  2 -
 .../flink/metrics/jmx/JMXReporterTest.java      |  6 ---
 .../jobmanager/JMXJobManagerMetricTest.java     |  1 -
 .../prometheus/PrometheusReporterTest.java      |  2 -
 .../flink/metrics/slf4j/Slf4jReporterTest.java  |  1 -
 .../metrics/statsd/StatsDReporterTest.java      |  3 --
 .../metrics/MetricRegistryConfiguration.java    | 45 +++++++++++++++++---
 .../runtime/metrics/MetricRegistryImpl.java     |  2 +-
 .../runtime/metrics/MetricRegistryImplTest.java | 13 ++----
 .../metrics/groups/AbstractMetricGroupTest.java |  1 -
 .../groups/MetricGroupRegistrationTest.java     |  2 -
 15 files changed, 49 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index e4b5161..2a9ab50 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -560,11 +560,11 @@ counter = getRuntimeContext()
 Metrics can be exposed to an external system by configuring one or several 
reporters in `conf/flink-conf.yaml`. These
 reporters will be instantiated on each job and task manager when they are 
started.
 
-- `metrics.reporters`: The list of named reporters.
 - `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the 
reporter named `<name>`.
 - `metrics.reporter.<name>.class`: The reporter class to use for the reporter 
named `<name>`.
 - `metrics.reporter.<name>.interval`: The reporter interval to use for the 
reporter named `<name>`.
 - `metrics.reporter.<name>.scope.delimiter`: The delimiter to use for the 
identifier (default value use `metrics.scope.delimiter`) for the reporter named 
`<name>`.
+- `metrics.reporters`: (optional) A comma-separated include list of reporter 
names. By default all configured reporters will be used.
 
 All reporters must at least have the `class` property, some allow specifying a 
reporting `interval`. Below,
 we will list more settings specific to each reporter.
@@ -606,7 +606,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: jmx
 metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
 metrics.reporter.jmx.port: 8789
 
@@ -642,7 +641,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: gang
 metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
 metrics.reporter.gang.host: localhost
 metrics.reporter.gang.port: 8649
@@ -668,7 +666,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: grph
 metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
 metrics.reporter.grph.host: localhost
 metrics.reporter.grph.port: 2003
@@ -689,7 +686,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: prom
 metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter
 
 {% endhighlight %}
@@ -719,7 +715,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: stsd
 metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
 metrics.reporter.stsd.host: localhost
 metrics.reporter.stsd.port: 8125
@@ -743,7 +738,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: dghttp
 metrics.reporter.dghttp.class: 
org.apache.flink.metrics.datadog.DatadogHttpReporter
 metrics.reporter.dghttp.apikey: xxx
 metrics.reporter.dghttp.tags: myflinkapp,prod
@@ -760,7 +754,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: slf4j
 metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
 metrics.reporter.slf4j.interval: 60 SECONDS
 

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 74485b4..f471cd4 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -21,7 +21,6 @@ import 
org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -136,7 +135,6 @@ public abstract class KafkaTestBase extends TestLogger {
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
                flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
16L);
                
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
-               flinkConfig.setString(MetricOptions.REPORTERS_LIST, 
"my_reporter");
                flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
                return flinkConfig;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 8a328ac..42eb575 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -25,12 +25,12 @@ import static 
org.apache.flink.configuration.ConfigOptions.key;
 public class MetricOptions {
 
        /**
-        * The list of named reporters. Names are defined here and per-reporter 
configs
-        * are given with the reporter config prefix and the reporter name.
+        * An optional list of reporter names. If configured, only reporters 
whose name matches any of the names in the list
+        * will be started. Otherwise, all reporters that could be found in the 
configuration will be started.
         *
-        * Example:
+        * <p>Example:
         * <pre>{@code
-        * metrics.reporters = foo, bar
+        * metrics.reporters = foo,bar
         *
         * metrics.reporter.foo.class = 
org.apache.flink.metrics.reporter.JMXReporter
         * metrics.reporter.foo.interval = 10

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 3fa0474..4a2ca3a 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -82,7 +82,6 @@ public class ScheduledDropwizardReporterTest {
                String taskManagerId = "tas:kMana::ger";
                String counterName = "testCounter";
 
-               configuration.setString(MetricOptions.REPORTERS_LIST, "test");
                configuration.setString(
                                ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
                                
"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index 8f70abb..fb21a75b 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.dropwizard.metrics;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
@@ -101,7 +100,6 @@ public class DropwizardFlinkHistogramWrapperTest extends 
TestLogger {
                int size = 10;
                String histogramMetricName = "histogram";
                Configuration config = new Configuration();
-               config.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestingReporter.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, 
reportingInterval + " MILLISECONDS");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 067b08f..40b7f15 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.metrics.jmx;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.util.TestHistogram;
@@ -100,7 +99,6 @@ public class JMXReporterTest extends TestLogger {
        @Test
        public void testPortConflictHandling() throws Exception {
                Configuration cfg = new Configuration();
-               cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1.port", "9020-9035");
@@ -160,8 +158,6 @@ public class JMXReporterTest extends TestLogger {
                Configuration cfg = new Configuration();
                cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
 
-               cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
-
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1.port", "9040-9055");
 
@@ -236,7 +232,6 @@ public class JMXReporterTest extends TestLogger {
 
                try {
                        Configuration config = new Configuration();
-                       config.setString(MetricOptions.REPORTERS_LIST, 
"jmx_test");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
                        registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
@@ -286,7 +281,6 @@ public class JMXReporterTest extends TestLogger {
 
                try {
                        Configuration config = new Configuration();
-                       config.setString(MetricOptions.REPORTERS_LIST, 
"jmx_test");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
                        registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 7280476..c74db28 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -64,7 +64,6 @@ public class JMXJobManagerMetricTest {
                Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
                Configuration flinkConfiguration = new Configuration();
 
-               flinkConfiguration.setString(MetricOptions.REPORTERS_LIST, 
"test");
                
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." 
+ ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
                
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.port", "9060-9075");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index d905c25..39cf4be 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.metrics.prometheus;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
@@ -279,7 +278,6 @@ public class PrometheusReporterTest extends TestLogger {
 
        static Configuration createConfigWithOneReporter(String reporterName, 
String portString) {
                Configuration cfg = new Configuration();
-               cfg.setString(MetricOptions.REPORTERS_LIST, reporterName);
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
reporterName + "." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
PrometheusReporter.class.getName());
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
reporterName + "." + ARG_PORT, portString);
                return cfg;

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
 
b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
index f538bc7..b344f45 100644
--- 
a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
+++ 
b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
@@ -63,7 +63,6 @@ public class Slf4jReporterTest extends TestLogger {
                TestUtils.addTestAppenderForRootLogger();
 
                Configuration configuration = new Configuration();
-               configuration.setString(MetricOptions.REPORTERS_LIST, "slf4j");
                configuration.setString(ConfigConstants.METRICS_REPORTER_PREFIX 
+ "slf4j." +
                        ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
Slf4jReporter.class.getName());
                configuration.setString(MetricOptions.SCOPE_NAMING_TASK, 
"<host>.<tm_id>.<job_name>");

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 275f2e1..08d4998 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -81,7 +81,6 @@ public class StatsDReporterTest extends TestLogger {
                String taskManagerId = "tas:kMana::ger";
                String counterName = "testCounter";
 
-               configuration.setString(MetricOptions.REPORTERS_LIST, "test");
                configuration.setString(
                                ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
                                
"org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter");
@@ -151,7 +150,6 @@ public class StatsDReporterTest extends TestLogger {
                        int port = receiver.getPort();
 
                        Configuration config = new Configuration();
-                       config.setString(MetricOptions.REPORTERS_LIST, "test");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", 
"localhost");
@@ -225,7 +223,6 @@ public class StatsDReporterTest extends TestLogger {
                        int port = receiver.getPort();
 
                        Configuration config = new Configuration();
-                       config.setString(MetricOptions.REPORTERS_LIST, "test");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", 
"localhost");

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index d07cb65..7188a59 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -32,7 +32,11 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * Configuration object for {@link MetricRegistryImpl}.
@@ -44,7 +48,14 @@ public class MetricRegistryConfiguration {
        private static volatile MetricRegistryConfiguration 
defaultConfiguration;
 
        // regex pattern to split the defined reporters
-       private static final Pattern splitPattern = 
Pattern.compile("\\s*,\\s*");
+       private static final Pattern reporterListPattern = 
Pattern.compile("\\s*,\\s*");
+
+       // regex pattern to extract the name from reporter configuration keys, 
e.g. "rep" from "metrics.reporter.rep.class"
+       private static final Pattern reporterClassPattern = Pattern.compile(
+               Pattern.quote(ConfigConstants.METRICS_REPORTER_PREFIX) +
+               // [\S&&[^.]] = intersection of non-whitespace and non-period 
character classes
+               "([\\S&&[^.]]*)\\." +
+               Pattern.quote(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX));
 
        // scope formats for the different components
        private final ScopeFormats scopeFormats;
@@ -108,15 +119,37 @@ public class MetricRegistryConfiguration {
                        delim = '.';
                }
 
-               final String definedReporters = 
configuration.getString(MetricOptions.REPORTERS_LIST);
+               String includedReportersString = 
configuration.getString(MetricOptions.REPORTERS_LIST, "");
+               Set<String> includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+                       .collect(Collectors.toSet());
+
+               // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+               Set<String> namedReporters = new TreeSet<>(String::compareTo);
+               // scan entire configuration for "metric.reporter" keys and 
parse individual reporter configurations
+               for (String key : configuration.keySet()) {
+                       if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+                               Matcher matcher = 
reporterClassPattern.matcher(key);
+                               if (matcher.matches()) {
+                                       String reporterName = matcher.group(1);
+                                       if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+                                               if 
(namedReporters.contains(reporterName)) {
+                                                       LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+                                               } else {
+                                                       
namedReporters.add(reporterName);
+                                               }
+                                       } else {
+                                               LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+                                       }
+                               }
+                       }
+               }
+
                List<Tuple2<String, Configuration>> reporterConfigurations;
 
-               if (definedReporters == null) {
+               if (namedReporters.isEmpty()) {
                        reporterConfigurations = Collections.emptyList();
                } else {
-                       String[] namedReporters = 
splitPattern.split(definedReporters);
-
-                       reporterConfigurations = new 
ArrayList<>(namedReporters.length);
+                       reporterConfigurations = new 
ArrayList<>(namedReporters.size());
 
                        for (String namedReporter: namedReporters) {
                                DelegatingConfiguration delegatingConfiguration 
= new DelegatingConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index d36d095..c8f4490 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -137,7 +137,7 @@ public class MetricRegistryImpl implements MetricRegistry {
 
                                        MetricConfig metricConfig = new 
MetricConfig();
                                        
reporterConfig.addAllToProperties(metricConfig);
-                                       LOG.info("Configuring {} with {}.", 
reporterClass.getSimpleName(), metricConfig);
+                                       LOG.info("Configuring {} with {}.", 
namedReporter, metricConfig);
                                        reporterInstance.open(metricConfig);
 
                                        if (reporterInstance instanceof 
Scheduled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
index b0b20b2..2eccc0c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
@@ -70,20 +70,22 @@ public class MetricRegistryImplTest extends TestLogger {
        }
 
        /**
-        * Verifies that the reporter class argument is correctly used to 
instantiate and open the reporter.
+        * Verifies that the reporter name list is correctly used to determine 
which reporters should be instantiated.
         */
        @Test
-       public void testReporterInstantiation() {
+       public void testReporterInclusion() {
                Configuration config = new Configuration();
 
                config.setString(MetricOptions.REPORTERS_LIST, "test");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter11.class.getName());
 
                MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
                assertTrue(metricRegistry.getReporters().size() == 1);
 
                Assert.assertTrue(TestReporter1.wasOpened);
+               Assert.assertFalse(TestReporter11.wasOpened);
 
                metricRegistry.shutdown();
        }
@@ -107,7 +109,6 @@ public class MetricRegistryImplTest extends TestLogger {
        public void testMultipleReporterInstantiation() {
                Configuration config = new Configuration();
 
-               config.setString(MetricOptions.REPORTERS_LIST, "test1, 
test2,test3");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter11.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter12.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter13.class.getName());
@@ -166,7 +167,6 @@ public class MetricRegistryImplTest extends TestLogger {
        public void testReporterArgumentForwarding() {
                Configuration config = new Configuration();
 
-               config.setString(MetricOptions.REPORTERS_LIST, "test");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter2.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg1", "hello");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg2", "world");
@@ -197,7 +197,6 @@ public class MetricRegistryImplTest extends TestLogger {
        public void testReporterScheduling() throws InterruptedException {
                Configuration config = new Configuration();
 
-               config.setString(MetricOptions.REPORTERS_LIST, "test");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter3.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg1", "hello");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
@@ -247,7 +246,6 @@ public class MetricRegistryImplTest extends TestLogger {
        @Test
        public void testReporterNotifications() {
                Configuration config = new Configuration();
-               config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter6.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter7.class.getName());
 
@@ -356,7 +354,6 @@ public class MetricRegistryImplTest extends TestLogger {
        @Test
        public void testConfigurableDelimiterForReporters() {
                Configuration config = new Configuration();
-               config.setString(MetricOptions.REPORTERS_LIST, 
"test1,test2,test3");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -379,7 +376,6 @@ public class MetricRegistryImplTest extends TestLogger {
        @Test
        public void testConfigurableDelimiterForReportersInGroup() {
                Configuration config = new Configuration();
-               config.setString(MetricOptions.REPORTERS_LIST, 
"test1,test2,test3,test4");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -459,7 +455,6 @@ public class MetricRegistryImplTest extends TestLogger {
        public void testExceptionIsolation() throws Exception {
 
                Configuration config = new Configuration();
-               config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
FailingReporter.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter7.class.getName());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index 8d91b81..325982b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -84,7 +84,6 @@ public class AbstractMetricGroupTest {
        public void testScopeCachingForMultipleReporters() throws Exception {
                Configuration config = new Configuration();
                config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
-               config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter2.class.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index 324bb73..bcdcd63 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
@@ -47,7 +46,6 @@ public class MetricGroupRegistrationTest extends TestLogger {
        @Test
        public void testMetricInstantiation() {
                Configuration config = new Configuration();
-               config.setString(MetricOptions.REPORTERS_LIST, "test");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
 
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));

Reply via email to