[FLINK-8080][metrics] metrics.reporters now optional include list This closes #5099.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/493c2857 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/493c2857 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/493c2857 Branch: refs/heads/master Commit: 493c28571f22d9dde4edbec0ba38f2761fb51335 Parents: 42b0114 Author: zentol <[email protected]> Authored: Wed Nov 15 12:56:30 2017 +0100 Committer: zentol <[email protected]> Committed: Tue Dec 12 19:09:15 2017 +0100 ---------------------------------------------------------------------- docs/monitoring/metrics.md | 9 +--- .../connectors/kafka/KafkaTestBase.java | 2 - .../flink/configuration/MetricOptions.java | 8 ++-- .../ScheduledDropwizardReporterTest.java | 1 - .../DropwizardFlinkHistogramWrapperTest.java | 2 - .../flink/metrics/jmx/JMXReporterTest.java | 6 --- .../jobmanager/JMXJobManagerMetricTest.java | 1 - .../prometheus/PrometheusReporterTest.java | 2 - .../flink/metrics/slf4j/Slf4jReporterTest.java | 1 - .../metrics/statsd/StatsDReporterTest.java | 3 -- .../metrics/MetricRegistryConfiguration.java | 45 +++++++++++++++++--- .../runtime/metrics/MetricRegistryImpl.java | 2 +- .../runtime/metrics/MetricRegistryImplTest.java | 13 ++---- .../metrics/groups/AbstractMetricGroupTest.java | 1 - .../groups/MetricGroupRegistrationTest.java | 2 - 15 files changed, 49 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/docs/monitoring/metrics.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index e4b5161..2a9ab50 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -560,11 +560,11 @@ counter = getRuntimeContext() Metrics can be exposed to an external system by configuring one or several reporters in `conf/flink-conf.yaml`. These reporters will be instantiated on each job and task manager when they are started. -- `metrics.reporters`: The list of named reporters. - `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`. - `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`. - `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`. - `metrics.reporter.<name>.scope.delimiter`: The delimiter to use for the identifier (default value use `metrics.scope.delimiter`) for the reporter named `<name>`. +- `metrics.reporters`: (optional) A comma-separated include list of reporter names. By default all configured reporters will be used. All reporters must at least have the `class` property, some allow specifying a reporting `interval`. Below, we will list more settings specific to each reporter. @@ -606,7 +606,6 @@ Example configuration: {% highlight yaml %} -metrics.reporters: jmx metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter metrics.reporter.jmx.port: 8789 @@ -642,7 +641,6 @@ Example configuration: {% highlight yaml %} -metrics.reporters: gang metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter metrics.reporter.gang.host: localhost metrics.reporter.gang.port: 8649 @@ -668,7 +666,6 @@ Example configuration: {% highlight yaml %} -metrics.reporters: grph metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter metrics.reporter.grph.host: localhost metrics.reporter.grph.port: 2003 @@ -689,7 +686,6 @@ Example configuration: {% highlight yaml %} -metrics.reporters: prom metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter {% endhighlight %} @@ -719,7 +715,6 @@ Example configuration: {% highlight yaml %} -metrics.reporters: stsd metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter metrics.reporter.stsd.host: localhost metrics.reporter.stsd.port: 8125 @@ -743,7 +738,6 @@ Example configuration: {% highlight yaml %} -metrics.reporters: dghttp metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter metrics.reporter.dghttp.apikey: xxx metrics.reporter.dghttp.tags: myflinkapp,prod @@ -760,7 +754,6 @@ Example configuration: {% highlight yaml %} -metrics.reporters: slf4j metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter metrics.reporter.slf4j.interval: 60 SECONDS http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 74485b4..f471cd4 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -21,7 +21,6 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.jmx.JMXReporter; import org.apache.flink.runtime.client.JobExecutionException; @@ -136,7 +135,6 @@ public abstract class KafkaTestBase extends TestLogger { flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS); flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); - flinkConfig.setString(MetricOptions.REPORTERS_LIST, "my_reporter"); flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); return flinkConfig; } http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index 8a328ac..42eb575 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -25,12 +25,12 @@ import static org.apache.flink.configuration.ConfigOptions.key; public class MetricOptions { /** - * The list of named reporters. Names are defined here and per-reporter configs - * are given with the reporter config prefix and the reporter name. + * An optional list of reporter names. If configured, only reporters whose name matches any of the names in the list + * will be started. Otherwise, all reporters that could be found in the configuration will be started. * - * Example: + * <p>Example: * <pre>{@code - * metrics.reporters = foo, bar + * metrics.reporters = foo,bar * * metrics.reporter.foo.class = org.apache.flink.metrics.reporter.JMXReporter * metrics.reporter.foo.interval = 10 http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java index 3fa0474..4a2ca3a 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java @@ -82,7 +82,6 @@ public class ScheduledDropwizardReporterTest { String taskManagerId = "tas:kMana::ger"; String counterName = "testCounter"; - configuration.setString(MetricOptions.REPORTERS_LIST, "test"); configuration.setString( ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, "org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter"); http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java index 8f70abb..fb21a75b 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java @@ -20,7 +20,6 @@ package org.apache.flink.dropwizard.metrics; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.MetricOptions; import org.apache.flink.dropwizard.ScheduledDropwizardReporter; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.MetricReporter; @@ -101,7 +100,6 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger { int size = 10; String histogramMetricName = "histogram"; Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "my_reporter"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, reportingInterval + " MILLISECONDS"); http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java index 067b08f..40b7f15 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java @@ -20,7 +20,6 @@ package org.apache.flink.metrics.jmx; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.util.TestHistogram; @@ -100,7 +99,6 @@ public class JMXReporterTest extends TestLogger { @Test public void testPortConflictHandling() throws Exception { Configuration cfg = new Configuration(); - cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2"); cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9020-9035"); @@ -160,8 +158,6 @@ public class JMXReporterTest extends TestLogger { Configuration cfg = new Configuration(); cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); - cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2"); - cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9040-9055"); @@ -236,7 +232,6 @@ public class JMXReporterTest extends TestLogger { try { Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "jmx_test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); @@ -286,7 +281,6 @@ public class JMXReporterTest extends TestLogger { try { Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "jmx_test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index 7280476..c74db28 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -64,7 +64,6 @@ public class JMXJobManagerMetricTest { Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); Configuration flinkConfiguration = new Configuration(); - flinkConfiguration.setString(MetricOptions.REPORTERS_LIST, "test"); flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "9060-9075"); http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java index d905c25..39cf4be 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java @@ -20,7 +20,6 @@ package org.apache.flink.metrics.prometheus; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; @@ -279,7 +278,6 @@ public class PrometheusReporterTest extends TestLogger { static Configuration createConfigWithOneReporter(String reporterName, String portString) { Configuration cfg = new Configuration(); - cfg.setString(MetricOptions.REPORTERS_LIST, reporterName); cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + reporterName + "." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getName()); cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + reporterName + "." + ARG_PORT, portString); return cfg; http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java index f538bc7..b344f45 100644 --- a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java +++ b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java @@ -63,7 +63,6 @@ public class Slf4jReporterTest extends TestLogger { TestUtils.addTestAppenderForRootLogger(); Configuration configuration = new Configuration(); - configuration.setString(MetricOptions.REPORTERS_LIST, "slf4j"); configuration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "slf4j." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, Slf4jReporter.class.getName()); configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>"); http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java index 275f2e1..08d4998 100644 --- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -81,7 +81,6 @@ public class StatsDReporterTest extends TestLogger { String taskManagerId = "tas:kMana::ger"; String counterName = "testCounter"; - configuration.setString(MetricOptions.REPORTERS_LIST, "test"); configuration.setString( ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, "org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter"); @@ -151,7 +150,6 @@ public class StatsDReporterTest extends TestLogger { int port = receiver.getPort(); Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost"); @@ -225,7 +223,6 @@ public class StatsDReporterTest extends TestLogger { int port = receiver.getPort(); Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost"); http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java index d07cb65..7188a59 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java @@ -32,7 +32,11 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * Configuration object for {@link MetricRegistryImpl}. @@ -44,7 +48,14 @@ public class MetricRegistryConfiguration { private static volatile MetricRegistryConfiguration defaultConfiguration; // regex pattern to split the defined reporters - private static final Pattern splitPattern = Pattern.compile("\\s*,\\s*"); + private static final Pattern reporterListPattern = Pattern.compile("\\s*,\\s*"); + + // regex pattern to extract the name from reporter configuration keys, e.g. "rep" from "metrics.reporter.rep.class" + private static final Pattern reporterClassPattern = Pattern.compile( + Pattern.quote(ConfigConstants.METRICS_REPORTER_PREFIX) + + // [\S&&[^.]] = intersection of non-whitespace and non-period character classes + "([\\S&&[^.]]*)\\." + + Pattern.quote(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX)); // scope formats for the different components private final ScopeFormats scopeFormats; @@ -108,15 +119,37 @@ public class MetricRegistryConfiguration { delim = '.'; } - final String definedReporters = configuration.getString(MetricOptions.REPORTERS_LIST); + String includedReportersString = configuration.getString(MetricOptions.REPORTERS_LIST, ""); + Set<String> includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set<String> namedReporters = new TreeSet<>(String::compareTo); + // scan entire configuration for "metric.reporter" keys and parse individual reporter configurations + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + List<Tuple2<String, Configuration>> reporterConfigurations; - if (definedReporters == null) { + if (namedReporters.isEmpty()) { reporterConfigurations = Collections.emptyList(); } else { - String[] namedReporters = splitPattern.split(definedReporters); - - reporterConfigurations = new ArrayList<>(namedReporters.length); + reporterConfigurations = new ArrayList<>(namedReporters.size()); for (String namedReporter: namedReporters) { DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java index d36d095..c8f4490 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java @@ -137,7 +137,7 @@ public class MetricRegistryImpl implements MetricRegistry { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - LOG.info("Configuring {} with {}.", reporterClass.getSimpleName(), metricConfig); + LOG.info("Configuring {} with {}.", namedReporter, metricConfig); reporterInstance.open(metricConfig); if (reporterInstance instanceof Scheduled) { http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java index b0b20b2..2eccc0c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java @@ -70,20 +70,22 @@ public class MetricRegistryImplTest extends TestLogger { } /** - * Verifies that the reporter class argument is correctly used to instantiate and open the reporter. + * Verifies that the reporter name list is correctly used to determine which reporters should be instantiated. */ @Test - public void testReporterInstantiation() { + public void testReporterInclusion() { Configuration config = new Configuration(); config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName()); MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); assertTrue(metricRegistry.getReporters().size() == 1); Assert.assertTrue(TestReporter1.wasOpened); + Assert.assertFalse(TestReporter11.wasOpened); metricRegistry.shutdown(); } @@ -107,7 +109,6 @@ public class MetricRegistryImplTest extends TestLogger { public void testMultipleReporterInstantiation() { Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test1, test2,test3"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName()); @@ -166,7 +167,6 @@ public class MetricRegistryImplTest extends TestLogger { public void testReporterArgumentForwarding() { Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world"); @@ -197,7 +197,6 @@ public class MetricRegistryImplTest extends TestLogger { public void testReporterScheduling() throws InterruptedException { Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS"); @@ -247,7 +246,6 @@ public class MetricRegistryImplTest extends TestLogger { @Test public void testReporterNotifications() { Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test1,test2"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName()); @@ -356,7 +354,6 @@ public class MetricRegistryImplTest extends TestLogger { @Test public void testConfigurableDelimiterForReporters() { Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); @@ -379,7 +376,6 @@ public class MetricRegistryImplTest extends TestLogger { @Test public void testConfigurableDelimiterForReportersInGroup() { Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3,test4"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); @@ -459,7 +455,6 @@ public class MetricRegistryImplTest extends TestLogger { public void testExceptionIsolation() throws Exception { Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test1,test2"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, FailingReporter.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName()); http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index 8d91b81..325982b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -84,7 +84,6 @@ public class AbstractMetricGroupTest { public void testScopeCachingForMultipleReporters() throws Exception { Configuration config = new Configuration(); config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D"); - config.setString(MetricOptions.REPORTERS_LIST, "test1,test2"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName()); http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java index 324bb73..bcdcd63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; @@ -47,7 +46,6 @@ public class MetricGroupRegistrationTest extends TestLogger { @Test public void testMetricInstantiation() { Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
