Repository: flink Updated Branches: refs/heads/master cd232e683 -> 9bcbcf4a5
[FLINK-4246] Allow Specifying Multiple Metrics Reporters This also updates documentation and tests. Reporters can now be specified like this: metrics.reporters: foo,bar metrics.reporter.foo.class: JMXReporter.class metrics.reporter.foo.port: 10 metrics.reporter.bar.class: GangliaReporter.class metrics.reporter.bar.port: 11 metrics.reporter.bar.something: 42 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9bcbcf4a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9bcbcf4a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9bcbcf4a Branch: refs/heads/master Commit: 9bcbcf4a5d75de65671bf4cc4e1e3129d386a29b Parents: cd232e6 Author: Aljoscha Krettek <[email protected]> Authored: Fri Jul 22 15:25:01 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Jul 26 17:05:21 2016 +0200 ---------------------------------------------------------------------- docs/apis/metrics.md | 31 ++- docs/setup/config.md | 12 +- .../flink/configuration/ConfigConstants.java | 34 ++- .../flink/configuration/Configuration.java | 14 +- .../configuration/DelegatingConfiguration.java | 222 +++++++++++++++++++ .../UnmodifiableConfiguration.java | 9 + .../DelegatingConfigurationTest.java | 91 ++++++++ .../ScheduledDropwizardReporterTest.java | 18 +- .../DropwizardFlinkHistogramWrapperTest.java | 10 +- .../flink/metrics/jmx/JMXReporterTest.java | 59 +++-- .../jobmanager/JMXJobManagerMetricTest.java | 8 +- .../metrics/statsd/StatsDReporterTest.java | 24 +- .../flink/runtime/metrics/MetricRegistry.java | 145 ++++++------ .../runtime/operators/util/TaskConfig.java | 184 +-------------- .../ExecutionGraphMetricsTest.java | 7 +- .../runtime/metrics/MetricRegistryTest.java | 96 +++++++- .../groups/MetricGroupRegistrationTest.java | 3 +- .../util/DelegatingConfigurationTest.java | 93 -------- .../connectors/kafka/KafkaTestBase.java | 3 +- 19 files changed, 643 insertions(+), 420 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/docs/apis/metrics.md ---------------------------------------------------------------------- diff --git a/docs/apis/metrics.md b/docs/apis/metrics.md index e8c2772..af6ef64 100644 --- a/docs/apis/metrics.md +++ b/docs/apis/metrics.md @@ -227,14 +227,29 @@ or by assigning unique names to jobs and operators. ## Reporter -Metrics can be exposed to an external system by configuring a reporter in `conf/flink-conf.yaml`. - -- `metrics.reporter.class`: The class of the reporter to use. - - Example: org.apache.flink.metrics.jmx.JMXReporter -- `metrics.reporter.arguments`: A list of named parameters that are passed to the reporter. - - Example: --host localhost --port 9010 -- `metrics.reporter.interval`: The interval between reports. - - Example: 10 SECONDS +Metrics can be exposed to an external system by configuring one or several reporters in `conf/flink-conf.yaml`. + +- `metrics.reporters`: The list of named reporters. +- `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`. +- `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`. +- `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`. + +All reporters must at least have the `class` property, some allow specifying a reporting `interval`. Below, +we will list more settings specific to each reporter. + +Example reporter configuration that specifies multiple reporters: + +``` +metrics.reporters: my_jmx_reporter,my_other_reporter + +metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter +metrics.reporter.my_jmx_reporter.port: 9020-9040 + +metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter +metrics.reporter.my_other_reporter.host: 192.168.1.1 +metrics.reporter.my_other_reporter.port: 10000 + +``` You can write your own `Reporter` by implementing the `org.apache.flink.metrics.reporter.MetricReporter` interface. If the Reporter should send out reports regularly you have to implement the `Scheduled` interface as well. http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index 616173d..1ce6d56 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -150,7 +150,7 @@ Default value is 1. - `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default restart strategy is set to "fixed-delay". Default value is the `akka.ask.timeout`. -- `restart-strategy.failure-rate.max-failures-per-interval`: Maximum number of restarts in given time interval before failing a job in "failure-rate" strategy. +- `restart-strategy.failure-rate.max-failures-per-interval`: Maximum number of restarts in given time interval before failing a job in "failure-rate" strategy. Default value is 1. - `restart-strategy.failure-rate.failure-rate-interval`: Time interval for measuring failure rate in "failure-rate" strategy. @@ -288,7 +288,7 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use - `recovery.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create namespace directories. -- `recovery.zookeeper.path.namespace`: (Default '/default_ns' in standalone mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under the root dir where the ZooKeeper recovery mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. +- `recovery.zookeeper.path.namespace`: (Default '/default_ns' in standalone mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under the root dir where the ZooKeeper recovery mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. - `recovery.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader. @@ -312,7 +312,13 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use ## Metrics -- `metrics.jmx.port`: (Default: 9010-9025) Defines the port used by JMX. +- `metrics.reporters`: The list of named reporters, i.e. "foo,bar". + +- `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`. + +- `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`. + +- `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`. - `metrics.scope.jm`: (Default: <host>.jobmanager) Defines the scope format string that is applied to all metrics scoped to a JobManager. http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 028732a..5deed4e 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -668,14 +668,34 @@ public final class ConfigConstants { // ---------------------------- Metrics ----------------------------------- - /** The class of the reporter to use. */ - public static final String METRICS_REPORTER_CLASS = "metrics.reporter.class"; - - /** A list of named parameters that are passed to the reporter. */ - public static final String METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments"; + /** + * The list of named reporters. Names are defined here and per-reporter configs + * are given with the reporter config prefix and the reporter name. + * + * Example: + * <pre>{@code + * metrics.reporters = foo, bar + * + * metrics.reporter.foo.class = org.apache.flink.metrics.reporter.JMXReporter + * metrics.reporter.foo.interval = 10 + * + * metrics.reporter.bar.class = org.apache.flink.metrics.graphite.GraphiteReporter + * metrics.reporter.bar.port = 1337 + * }</pre> + */ + public static final String METRICS_REPORTERS_LIST = "metrics.reporters"; + + /** + * The prefix for per-reporter configs. Has to be combined with a reporter name and + * the configs mentioned below. + */ + public static final String METRICS_REPORTER_PREFIX = "metrics.reporter."; + + /** The class of the reporter to use. This is used as a suffix in an actual reporter config */ + public static final String METRICS_REPORTER_CLASS_SUFFIX = "class"; - /** The interval between reports. */ - public static final String METRICS_REPORTER_INTERVAL = "metrics.reporter.interval"; + /** The interval between reports. This is used as a suffix in an actual reporter config */ + public static final String METRICS_REPORTER_INTERVAL_SUFFIX = "interval"; /** The delimiter used to assemble the metric identifier. */ public static final String METRICS_SCOPE_DELIMITER = "metrics.scope.delimiter"; http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index 6b05053..8ca5d07 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Properties; import java.util.Set; import org.apache.flink.annotation.Public; @@ -56,7 +57,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters /** Stores the concrete key/value pairs of this configuration object. */ - private final HashMap<String, Object> confData; + protected final HashMap<String, Object> confData; // -------------------------------------------------------------------------------------------- @@ -420,6 +421,17 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters } } + /** + * Adds all entries in this {@code Configuration} to the given {@link Properties}. + */ + public void addAllToProperties(Properties props) { + synchronized (this.confData) { + for (Map.Entry<String, Object> entry : this.confData.entrySet()) { + props.put(entry.getKey(), entry.getValue()); + } + } + } + public void addAll(Configuration other) { synchronized (this.confData) { synchronized (other.confData) { http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java new file mode 100644 index 0000000..91f81d4 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * A configuration that manages a subset of keys with a common prefix from a given configuration. + */ +public final class DelegatingConfiguration extends Configuration { + + private static final long serialVersionUID = 1L; + + private final Configuration backingConfig; // the configuration actually storing the data + + private String prefix; // the prefix key by which keys for this config are marked + + // -------------------------------------------------------------------------------------------- + + /** + * Default constructor for serialization. Creates an empty delegating configuration. + */ + public DelegatingConfiguration() { + this.backingConfig = new Configuration(); + this.prefix = ""; + } + + /** + * Creates a new delegating configuration which stores its key/value pairs in the given + * configuration using the specifies key prefix. + * + * @param backingConfig The configuration holding the actual config data. + * @param prefix The prefix prepended to all config keys. + */ + public DelegatingConfiguration(Configuration backingConfig, String prefix) + { + this.backingConfig = backingConfig; + this.prefix = prefix; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public String getString(String key, String defaultValue) { + return this.backingConfig.getString(this.prefix + key, defaultValue); + } + + @Override + public void setString(String key, String value) { + this.backingConfig.setString(this.prefix + key, value); + } + + @Override + public <T> Class<T> getClass(String key, Class<? extends T> defaultValue, ClassLoader classLoader) throws ClassNotFoundException { + return this.backingConfig.getClass(this.prefix + key, defaultValue, classLoader); + } + + @Override + public void setClass(String key, Class<?> klazz) { + this.backingConfig.setClass(this.prefix + key, klazz); + } + + @Override + public int getInteger(String key, int defaultValue) { + return this.backingConfig.getInteger(this.prefix + key, defaultValue); + } + + @Override + public void setInteger(String key, int value) { + this.backingConfig.setInteger(this.prefix + key, value); + } + + @Override + public long getLong(String key, long defaultValue) { + return this.backingConfig.getLong(this.prefix + key, defaultValue); + } + + @Override + public void setLong(String key, long value) { + this.backingConfig.setLong(this.prefix + key, value); + } + + @Override + public boolean getBoolean(String key, boolean defaultValue) { + return this.backingConfig.getBoolean(this.prefix + key, defaultValue); + } + + @Override + public void setBoolean(String key, boolean value) { + this.backingConfig.setBoolean(this.prefix + key, value); + } + + @Override + public float getFloat(String key, float defaultValue) { + return this.backingConfig.getFloat(this.prefix + key, defaultValue); + } + + @Override + public void setFloat(String key, float value) { + this.backingConfig.setFloat(this.prefix + key, value); + } + + @Override + public double getDouble(String key, double defaultValue) { + return this.backingConfig.getDouble(this.prefix + key, defaultValue); + } + + @Override + public void setDouble(String key, double value) { + this.backingConfig.setDouble(this.prefix + key, value); + } + + @Override + public byte[] getBytes(final String key, final byte[] defaultValue) { + return this.backingConfig.getBytes(this.prefix + key, defaultValue); + } + + @Override + public void setBytes(final String key, final byte[] bytes) { + this.backingConfig.setBytes(this.prefix + key, bytes); + } + + @Override + public void addAllToProperties(Properties props) { + // only add keys with our prefix + synchronized (backingConfig.confData) { + for (Map.Entry<String, Object> entry : backingConfig.confData.entrySet()) { + if (entry.getKey().startsWith(prefix)) { + String keyWithoutPrefix = + entry.getKey().substring(prefix.length(), + entry.getKey().length()); + + props.put(keyWithoutPrefix, entry.getValue()); + } else { + // don't add stuff that doesn't have our prefix + } + } + } + + } + + @Override + public void addAll(Configuration other) { + this.addAll(other, ""); + } + + @Override + public void addAll(Configuration other, String prefix) { + this.backingConfig.addAll(other, this.prefix + prefix); + } + + @Override + public String toString() { + return backingConfig.toString(); + } + + @Override + public Set<String> keySet() { + final HashSet<String> set = new HashSet<String>(); + final int prefixLen = this.prefix == null ? 0 : this.prefix.length(); + + for (String key : this.backingConfig.keySet()) { + if (key.startsWith(this.prefix)) { + set.add(key.substring(prefixLen)); + } + } + return set; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public void read(DataInputView in) throws IOException { + this.prefix = in.readUTF(); + this.backingConfig.read(in); + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeUTF(this.prefix); + this.backingConfig.write(out); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return this.prefix.hashCode() ^ this.backingConfig.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof DelegatingConfiguration) { + DelegatingConfiguration other = (DelegatingConfiguration) obj; + return this.prefix.equals(other.prefix) && this.backingConfig.equals(other.backingConfig); + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java index 5514caf..24ad61e 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java @@ -20,6 +20,8 @@ package org.apache.flink.configuration; import org.apache.flink.annotation.Public; +import java.util.Properties; + /** * Unmodifiable version of the Configuration class. */ @@ -42,6 +44,13 @@ public class UnmodifiableConfiguration extends Configuration { // All mutating methods must fail // -------------------------------------------------------------------------------------------- + + @Override + public void addAllToProperties(Properties props) { + // override to make the UnmodifiableConfigurationTest happy + super.addAllToProperties(props); + } + @Override public final void addAll(Configuration other) { error(); http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java new file mode 100644 index 0000000..6d42129 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.configuration; + +import org.junit.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.Comparator; + +import static org.junit.Assert.assertTrue; + + +public class DelegatingConfigurationTest { + + /** + * http://stackoverflow.com/questions/22225663/checking-in-a-unit-test-whether-all-methods-are-delegated + */ + @Test + public void testIfDelegatesImplementAllMethods() throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { + + Comparator<Method> methodComparator = new Comparator<Method>() { + @Override + public int compare(Method o1, Method o2) { + String o1Str = o1.getName() + typeParamToString(o1.getParameterTypes()); + String o2Str = o2.getName() + typeParamToString(o2.getParameterTypes()); + return o1Str.compareTo( o2Str ); + } + + private String typeParamToString(Class<?>[] classes) { + String str = ""; + for(Object t : classes) { + str += t.toString(); + } + return str; + } + }; + + // For each method in the Configuration class... + Method[] confMethods = Configuration.class.getDeclaredMethods(); + Method[] delegateMethods = DelegatingConfiguration.class.getDeclaredMethods(); + Arrays.sort(confMethods, methodComparator); + Arrays.sort(delegateMethods, methodComparator); + match : for (Method configurationMethod : confMethods) { + boolean hasMethod = false; + if(!Modifier.isPublic(configurationMethod.getModifiers()) ) { + continue; + } + // Find matching method in wrapper class and call it + mismatch: for (Method wrapperMethod : delegateMethods) { + if (configurationMethod.getName().equals(wrapperMethod.getName())) { + + // Get parameters for method + Class<?>[] wrapperMethodParams = wrapperMethod.getParameterTypes(); + Class<?>[] configMethodParams = configurationMethod.getParameterTypes(); + if(wrapperMethodParams.length != configMethodParams.length) { + System.err.println("Length"); + break mismatch; + } + for(int i = 0; i < wrapperMethodParams.length; i++) { + if(wrapperMethodParams[i] != configMethodParams[i]) { + break mismatch; + } + } + hasMethod = true; + break match; + } + } + assertTrue("Foo method '" + configurationMethod.getName() + "' has not been wrapped correctly in DelegatingConfiguration wrapper", hasMethod); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java index 9b04c3b..5979c43 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java @@ -31,10 +31,10 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.util.AbstractID; - import org.junit.Test; -import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -43,7 +43,7 @@ import static org.junit.Assert.assertTrue; public class ScheduledDropwizardReporterTest { @Test - public void testInvalidCharacterReplacement() { + public void testInvalidCharacterReplacement() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { ScheduledDropwizardReporter reporter = new ScheduledDropwizardReporter() { @Override public ScheduledReporter getReporter(MetricConfig config) { @@ -68,7 +68,11 @@ public class ScheduledDropwizardReporterTest { String taskManagerId = "tas:kMana::ger"; String counterName = "testCounter"; - configuration.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter"); + configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + configuration.setString( + ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, + "org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter"); + configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>"); configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_"); @@ -84,10 +88,10 @@ public class ScheduledDropwizardReporterTest { taskMetricGroup.counter(counterName, myCounter); - Field reporterField = MetricRegistry.class.getDeclaredField("reporter"); - reporterField.setAccessible(true); + List<MetricReporter> reporters = metricRegistry.getReporters(); - MetricReporter metricReporter = (MetricReporter) reporterField.get(metricRegistry); + assertTrue(reporters.size() == 1); + MetricReporter metricReporter = reporters.get(0); assertTrue("Reporter should be of type ScheduledDropwizardReporter", metricReporter instanceof ScheduledDropwizardReporter); http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java index 7b27867..9c2ce41 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java @@ -96,8 +96,9 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger { int size = 10; String histogramMetricName = "histogram"; Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestingReporter.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_INTERVAL, reportingInterval + " MILLISECONDS"); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, reportingInterval + " MILLISECONDS"); MetricRegistry registry = null; @@ -112,10 +113,9 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger { String fullMetricName = metricGroup.getMetricIdentifier(histogramMetricName); - Field f = registry.getClass().getDeclaredField("reporter"); - f.setAccessible(true); + assertTrue(registry.getReporters().size() == 1); - MetricReporter reporter = (MetricReporter) f.get(registry); + MetricReporter reporter = registry.getReporters().get(0); assertTrue(reporter instanceof TestingReporter); http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java index efb38b2..14ba5ec 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java @@ -23,9 +23,10 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; -import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.TestReporter; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -38,8 +39,10 @@ import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; import java.lang.management.ManagementFactory; +import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class JMXReporterTest extends TestLogger { @@ -79,18 +82,24 @@ public class JMXReporterTest extends TestLogger { @Test public void testPortConflictHandling() throws Exception { Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2"); + + cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); + cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9020-9035"); + + cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); + cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9020-9035"); + MetricRegistry reg = new MetricRegistry(cfg); TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm"); - JMXReporter rep1 = new JMXReporter(); - JMXReporter rep2 = new JMXReporter(); + List<MetricReporter> reporters = reg.getReporters(); - MetricConfig cfg1 = new MetricConfig(); - cfg1.setProperty("port", "9020-9035"); + assertTrue(reporters.size() == 2); - rep1.open(cfg1); - rep2.open(cfg1); + MetricReporter rep1 = reporters.get(0); + MetricReporter rep2 = reporters.get(1); rep1.notifyOfAddedMetric(new Gauge<Integer>() { @Override @@ -114,8 +123,6 @@ public class JMXReporterTest extends TestLogger { assertEquals(1, mBeanServer.getAttribute(objectName1, "Value")); assertEquals(2, mBeanServer.getAttribute(objectName2, "Value")); - rep1.close(); - rep2.close(); reg.shutdown(); } @@ -127,17 +134,26 @@ public class JMXReporterTest extends TestLogger { @Test public void testJMXAvailability() throws Exception { Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); + + cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2"); + + cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); + cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9040-9055"); + + cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); + cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9040-9055"); + MetricRegistry reg = new MetricRegistry(cfg); TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm"); - JMXReporter rep1 = new JMXReporter(); - JMXReporter rep2 = new JMXReporter(); + List<MetricReporter> reporters = reg.getReporters(); + + assertTrue(reporters.size() == 2); - MetricConfig cfg1 = new MetricConfig(); - cfg1.setProperty("port", "9040-9055"); - rep1.open(cfg1); - rep2.open(cfg1); + MetricReporter rep1 = reporters.get(0); + MetricReporter rep2 = reporters.get(1); rep1.notifyOfAddedMetric(new Gauge<Integer>() { @Override @@ -156,29 +172,23 @@ public class JMXReporterTest extends TestLogger { ObjectName objectName1 = new ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents())); ObjectName objectName2 = new ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents())); - JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + rep1.getPort() + "/jndi/rmi://localhost:" + rep1.getPort() + "/jmxrmi"); + JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jmxrmi"); JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1); MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection(); assertEquals(1, mCon1.getAttribute(objectName1, "Value")); assertEquals(2, mCon1.getAttribute(objectName2, "Value")); - url1 = null; jmxCon1.close(); - jmxCon1 = null; - mCon1 = null; - JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + rep2.getPort() + "/jndi/rmi://localhost:" + rep2.getPort() + "/jmxrmi"); + JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter)rep2).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter)rep2).getPort() + "/jmxrmi"); JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2); MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection(); assertEquals(1, mCon2.getAttribute(objectName1, "Value")); assertEquals(2, mCon2.getAttribute(objectName2, "Value")); - url2 = null; jmxCon2.close(); - jmxCon2 = null; - mCon2 = null; rep1.close(); rep2.close(); @@ -195,7 +205,8 @@ public class JMXReporterTest extends TestLogger { try { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, JMXReporter.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); registry = new MetricRegistry(config); http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index 8b0f672..fa1f9f8 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.jmx.JMXReporter; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; -import org.apache.flink.metrics.jmx.JMXReporter; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.junit.Test; @@ -52,9 +52,11 @@ public class JMXJobManagerMetricTest { Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); Configuration flinkConfiguration = new Configuration(); - flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_CLASS, JMXReporter.class.getName()); + flinkConfiguration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); + flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "9060-9075"); + flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>"); - flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "--port 9060-9075"); TestingCluster flink = new TestingCluster(flinkConfiguration); http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java index d7ae04f..8c1af0e 100644 --- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -36,12 +36,12 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; import java.io.IOException; -import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.SocketException; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -73,7 +73,11 @@ public class StatsDReporterTest extends TestLogger { String taskManagerId = "tas:kMana::ger"; String counterName = "testCounter"; - configuration.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter"); + configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + configuration.setString( + ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, + "org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter"); + configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>"); configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_"); @@ -89,10 +93,11 @@ public class StatsDReporterTest extends TestLogger { taskMetricGroup.counter(counterName, myCounter); - Field reporterField = MetricRegistry.class.getDeclaredField("reporter"); - reporterField.setAccessible(true); + List<MetricReporter> reporters = metricRegistry.getReporters(); + + assertTrue(reporters.size() == 1); - MetricReporter metricReporter = (MetricReporter) reporterField.get(metricRegistry); + MetricReporter metricReporter = reporters.get(0); assertTrue("Reporter should be of type StatsDReporter", metricReporter instanceof StatsDReporter); @@ -138,9 +143,11 @@ public class StatsDReporterTest extends TestLogger { int port = receiver.getPort(); Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, StatsDReporter.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_INTERVAL, "1 SECONDS"); - config.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "--host localhost --port " + port); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "" + port); registry = new MetricRegistry(config); @@ -308,6 +315,7 @@ public class StatsDReporterTest extends TestLogger { byte[] buffer = new byte[1024]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + socket.receive(packet); String line = new String(packet.getData(), 0, packet.getLength()); http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java index 9c4da6c..65b5b9b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.metrics; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; @@ -30,6 +31,8 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.TimerTask; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -41,9 +44,9 @@ import java.util.concurrent.TimeUnit; */ public class MetricRegistry { static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class); - - private final MetricReporter reporter; - private final ScheduledExecutorService executor; + + private List<MetricReporter> reporters; + private ScheduledExecutorService executor; private final ScopeFormats scopeFormats; @@ -74,56 +77,68 @@ public class MetricRegistry { this.delimiter = delim; // second, instantiate any custom configured reporters + this.reporters = new ArrayList<>(); + + final String definedReporters = config.getString(ConfigConstants.METRICS_REPORTERS_LIST, null); - final String className = config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null); - if (className == null) { + if (definedReporters == null) { + // no reporters defined // by default, don't report anything LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); - this.reporter = null; this.executor = null; - } - else { - MetricReporter reporter; - ScheduledExecutorService executor = null; - try { - String configuredPeriod = config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null); - TimeUnit timeunit = TimeUnit.SECONDS; - long period = 10; - - if (configuredPeriod != null) { - try { - String[] interval = configuredPeriod.split(" "); - period = Long.parseLong(interval[0]); - timeunit = TimeUnit.valueOf(interval[1]); - } - catch (Exception e) { - LOG.error("Cannot parse report interval from config: " + configuredPeriod + - " - please use values like '10 SECONDS' or '500 MILLISECONDS'. " + - "Using default reporting interval."); - } + } else { + // we have some reporters so + String[] namedReporters = definedReporters.split("\\s*,\\s*"); + for (String namedReporter : namedReporters) { + + DelegatingConfiguration reporterConfig = new DelegatingConfiguration(config, ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + "."); + final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null); + if (className == null) { + LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported."); + continue; } - MetricConfig reporterConfig = createReporterConfig(config); + try { + String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null); + TimeUnit timeunit = TimeUnit.SECONDS; + long period = 10; + + if (configuredPeriod != null) { + try { + String[] interval = configuredPeriod.split(" "); + period = Long.parseLong(interval[0]); + timeunit = TimeUnit.valueOf(interval[1]); + } + catch (Exception e) { + LOG.error("Cannot parse report interval from config: " + configuredPeriod + + " - please use values like '10 SECONDS' or '500 MILLISECONDS'. " + + "Using default reporting interval."); + } + } - Class<?> reporterClass = Class.forName(className); - reporter = (MetricReporter) reporterClass.newInstance(); - reporter.open(reporterConfig); + Class<?> reporterClass = Class.forName(className); + MetricReporter reporterInstance = (MetricReporter) reporterClass.newInstance(); - if (reporter instanceof Scheduled) { - executor = Executors.newSingleThreadScheduledExecutor(); - LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name()); + MetricConfig metricConfig = new MetricConfig(); + reporterConfig.addAllToProperties(metricConfig); + reporterInstance.open(metricConfig); - executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit); + if (reporterInstance instanceof Scheduled) { + if (this.executor == null) { + executor = Executors.newSingleThreadScheduledExecutor(); + } + LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className); + + executor.scheduleWithFixedDelay( + new ReporterTask((Scheduled) reporterInstance), period, period, timeunit); + } + reporters.add(reporterInstance); + } + catch (Throwable t) { + shutdownExecutor(); + LOG.error("Could not instantiate metrics reporter" + namedReporter + ". Metrics might not be exposed/reported.", t); } } - catch (Throwable t) { - shutdownExecutor(); - LOG.info("Could not instantiate metrics reporter. No metrics will be exposed/reported.", t); - reporter = null; - } - - this.reporter = reporter; - this.executor = executor; } } @@ -131,24 +146,27 @@ public class MetricRegistry { return this.delimiter; } - public MetricReporter getReporter() { - return reporter; + public List<MetricReporter> getReporters() { + return reporters; } /** * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}. */ public void shutdown() { - if (reporter != null) { - try { - reporter.close(); - } catch (Throwable t) { - LOG.warn("Metrics reporter did not shut down cleanly", t); + if (reporters != null) { + for (MetricReporter reporter : reporters) { + try { + reporter.close(); + } catch (Throwable t) { + LOG.warn("Metrics reporter did not shut down cleanly", t); + } } + reporters = null; } shutdownExecutor(); } - + private void shutdownExecutor() { if (executor != null) { executor.shutdown(); @@ -180,8 +198,12 @@ public class MetricRegistry { */ public void register(Metric metric, String metricName, MetricGroup group) { try { - if (reporter != null) { - reporter.notifyOfAddedMetric(metric, metricName, group); + if (reporters != null) { + for (MetricReporter reporter : reporters) { + if (reporter != null) { + reporter.notifyOfAddedMetric(metric, metricName, group); + } + } } } catch (Exception e) { LOG.error("Error while registering metric.", e); @@ -197,8 +219,12 @@ public class MetricRegistry { */ public void unregister(Metric metric, String metricName, MetricGroup group) { try { - if (reporter != null) { - reporter.notifyOfRemovedMetric(metric, metricName, group); + if (reporters != null) { + for (MetricReporter reporter : reporters) { + if (reporter != null) { + reporter.notifyOfRemovedMetric(metric, metricName, group); + } + } } } catch (Exception e) { LOG.error("Error while registering metric.", e); @@ -208,17 +234,6 @@ public class MetricRegistry { // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ - static MetricConfig createReporterConfig(Configuration config) { - MetricConfig reporterConfig = new MetricConfig(); - - String[] arguments = config.getString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "").split(" "); - if (arguments.length > 1) { - for (int x = 0; x < arguments.length; x += 2) { - reporterConfig.setProperty(arguments[x].replace("--", ""), arguments[x + 1]); - } - } - return reporterConfig; - } static ScopeFormats createScopeConfig(Configuration config) { String jmFormat = config.getString( http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java index 03a8910..b598523 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java @@ -25,9 +25,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.aggregators.AggregatorWithName; @@ -40,9 +38,8 @@ import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.Driver; @@ -1162,184 +1159,5 @@ public class TaskConfig implements Serializable { public boolean isSolutionSetUnmanaged() { return config.getBoolean(SOLUTION_SET_OBJECTS, false); } - - // -------------------------------------------------------------------------------------------- - // Utility class for nested Configurations - // -------------------------------------------------------------------------------------------- - - /** - * A configuration that manages a subset of keys with a common prefix from a given configuration. - */ - public static final class DelegatingConfiguration extends Configuration { - - private static final long serialVersionUID = 1L; - - private final Configuration backingConfig; // the configuration actually storing the data - - private String prefix; // the prefix key by which keys for this config are marked - - // -------------------------------------------------------------------------------------------- - - /** - * Default constructor for serialization. Creates an empty delegating configuration. - */ - public DelegatingConfiguration() { - this.backingConfig = new Configuration(); - this.prefix = ""; - } - - /** - * Creates a new delegating configuration which stores its key/value pairs in the given - * configuration using the specifies key prefix. - * - * @param backingConfig The configuration holding the actual config data. - * @param prefix The prefix prepended to all config keys. - */ - public DelegatingConfiguration(Configuration backingConfig, String prefix) - { - this.backingConfig = backingConfig; - this.prefix = prefix; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public String getString(String key, String defaultValue) { - return this.backingConfig.getString(this.prefix + key, defaultValue); - } - - @Override - public void setString(String key, String value) { - this.backingConfig.setString(this.prefix + key, value); - } - - @Override - public <T> Class<T> getClass(String key, Class<? extends T> defaultValue, ClassLoader classLoader) throws ClassNotFoundException { - return this.backingConfig.getClass(this.prefix + key, defaultValue, classLoader); - } - - @Override - public void setClass(String key, Class<?> klazz) { - this.backingConfig.setClass(this.prefix + key, klazz); - } - - @Override - public int getInteger(String key, int defaultValue) { - return this.backingConfig.getInteger(this.prefix + key, defaultValue); - } - - @Override - public void setInteger(String key, int value) { - this.backingConfig.setInteger(this.prefix + key, value); - } - - @Override - public long getLong(String key, long defaultValue) { - return this.backingConfig.getLong(this.prefix + key, defaultValue); - } - - @Override - public void setLong(String key, long value) { - this.backingConfig.setLong(this.prefix + key, value); - } - - @Override - public boolean getBoolean(String key, boolean defaultValue) { - return this.backingConfig.getBoolean(this.prefix + key, defaultValue); - } - - @Override - public void setBoolean(String key, boolean value) { - this.backingConfig.setBoolean(this.prefix + key, value); - } - - @Override - public float getFloat(String key, float defaultValue) { - return this.backingConfig.getFloat(this.prefix + key, defaultValue); - } - - @Override - public void setFloat(String key, float value) { - this.backingConfig.setFloat(this.prefix + key, value); - } - - @Override - public double getDouble(String key, double defaultValue) { - return this.backingConfig.getDouble(this.prefix + key, defaultValue); - } - - @Override - public void setDouble(String key, double value) { - this.backingConfig.setDouble(this.prefix + key, value); - } - - @Override - public byte[] getBytes(final String key, final byte[] defaultValue) { - return this.backingConfig.getBytes(this.prefix + key, defaultValue); - } - - @Override - public void setBytes(final String key, final byte[] bytes) { - this.backingConfig.setBytes(this.prefix + key, bytes); - } - - @Override - public void addAll(Configuration other) { - this.addAll(other, ""); - } - - @Override - public void addAll(Configuration other, String prefix) { - this.backingConfig.addAll(other, this.prefix + prefix); - } - - @Override - public String toString() { - return backingConfig.toString(); - } - - @Override - public Set<String> keySet() { - final HashSet<String> set = new HashSet<String>(); - final int prefixLen = this.prefix == null ? 0 : this.prefix.length(); - - for (String key : this.backingConfig.keySet()) { - if (key.startsWith(this.prefix)) { - set.add(key.substring(prefixLen)); - } - } - return set; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public void read(DataInputView in) throws IOException { - this.prefix = in.readUTF(); - this.backingConfig.read(in); - } - @Override - public void write(DataOutputView out) throws IOException { - out.writeUTF(this.prefix); - this.backingConfig.write(out); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return this.prefix.hashCode() ^ this.backingConfig.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof DelegatingConfiguration) { - DelegatingConfiguration other = (DelegatingConfiguration) obj; - return this.prefix.equals(other.prefix) && this.backingConfig.equals(other.backingConfig); - } else { - return false; - } - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index 630335b..2b8b867 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -85,7 +85,8 @@ public class ExecutionGraphMetricsTest extends TestLogger { JobGraph jobGraph = new JobGraph("Test Job", jobVertex); Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestingReporter.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName()); Configuration jobConfig = new Configuration(); @@ -93,7 +94,9 @@ public class ExecutionGraphMetricsTest extends TestLogger { MetricRegistry metricRegistry = new MetricRegistry(config); - MetricReporter reporter = metricRegistry.getReporter(); + assertTrue(metricRegistry.getReporters().size() == 1); + + MetricReporter reporter = metricRegistry.getReporters().get(0); assertTrue(reporter instanceof TestingReporter); http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java index 3252e3d..d75ef57 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java @@ -45,9 +45,12 @@ public class MetricRegistryTest extends TestLogger { public void testReporterInstantiation() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter1.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); - new MetricRegistry(config); + MetricRegistry metricRegistry = new MetricRegistry(config); + + assertTrue(metricRegistry.getReporters().size() == 1); Assert.assertTrue(TestReporter1.wasOpened); } @@ -62,14 +65,64 @@ public class MetricRegistryTest extends TestLogger { } /** + * Verifies that multiple reporters are instantiated correctly. + */ + @Test + public void testMultipleReporterInstantiation() { + Configuration config = new Configuration(); + + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1, test2,test3"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName()); + + MetricRegistry metricRegistry = new MetricRegistry(config); + + assertTrue(metricRegistry.getReporters().size() == 3); + + Assert.assertTrue(TestReporter11.wasOpened); + Assert.assertTrue(TestReporter12.wasOpened); + Assert.assertTrue(TestReporter13.wasOpened); + } + + protected static class TestReporter11 extends TestReporter { + public static boolean wasOpened = false; + + @Override + public void open(MetricConfig config) { + wasOpened = true; + } + } + + protected static class TestReporter12 extends TestReporter { + public static boolean wasOpened = false; + + @Override + public void open(MetricConfig config) { + wasOpened = true; + } + } + + protected static class TestReporter13 extends TestReporter { + public static boolean wasOpened = false; + + @Override + public void open(MetricConfig config) { + wasOpened = true; + } + } + + /** * Verifies that configured arguments are properly forwarded to the reporter. */ @Test public void testReporterArgumentForwarding() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter2.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "--arg1 hello --arg2 world"); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world"); new MetricRegistry(config); } @@ -91,8 +144,10 @@ public class MetricRegistryTest extends TestLogger { public void testReporterScheduling() throws InterruptedException { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter3.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_INTERVAL, "50 MILLISECONDS"); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS"); new MetricRegistry(config); @@ -125,12 +180,14 @@ public class MetricRegistryTest extends TestLogger { } /** - * Verifies that reporters implementing the Listener interface are notified when Metrics are added or removed. + * Verifies that reporters are notified of added/removed metrics. */ @Test - public void testListener() { + public void testReporterNotifications() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter6.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName()); MetricRegistry registry = new MetricRegistry(config); @@ -140,6 +197,8 @@ public class MetricRegistryTest extends TestLogger { assertTrue(TestReporter6.addCalled); assertTrue(TestReporter6.removeCalled); + assertTrue(TestReporter7.addCalled); + assertTrue(TestReporter7.removeCalled); } protected static class TestReporter6 extends TestReporter { @@ -161,6 +220,25 @@ public class MetricRegistryTest extends TestLogger { } } + protected static class TestReporter7 extends TestReporter { + public static boolean addCalled = false; + public static boolean removeCalled = false; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + addCalled = true; + assertTrue(metric instanceof Counter); + assertEquals("rootCounter", metricName); + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + removeCalled = true; + Assert.assertTrue(metric instanceof Counter); + Assert.assertEquals("rootCounter", metricName); + } + } + /** * Verifies that the scope configuration is properly extracted. */ http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java index 39485dc..9936631 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java @@ -40,7 +40,8 @@ public class MetricGroupRegistrationTest { @Test public void testMetricInstantiation() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter1.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); MetricRegistry registry = new MetricRegistry(config); http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java deleted file mode 100644 index 09812f2..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.util; - -import static org.junit.Assert.assertTrue; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.Arrays; -import java.util.Comparator; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.operators.util.TaskConfig.DelegatingConfiguration; -import org.junit.Test; - - -public class DelegatingConfigurationTest { - - /** - * http://stackoverflow.com/questions/22225663/checking-in-a-unit-test-whether-all-methods-are-delegated - */ - @Test - public void testIfDelegatesImplementAllMethods() throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { - - Comparator<Method> methodComparator = new Comparator<Method>() { - @Override - public int compare(Method o1, Method o2) { - String o1Str = o1.getName() + typeParamToString(o1.getParameterTypes()); - String o2Str = o2.getName() + typeParamToString(o2.getParameterTypes()); - return o1Str.compareTo( o2Str ); - } - - private String typeParamToString(Class<?>[] classes) { - String str = ""; - for(Object t : classes) { - str += t.toString(); - } - return str; - } - }; - - // For each method in the Configuration class... - Method[] confMethods = Configuration.class.getDeclaredMethods(); - Method[] delegateMethods = DelegatingConfiguration.class.getDeclaredMethods(); - Arrays.sort(confMethods, methodComparator); - Arrays.sort(delegateMethods, methodComparator); - match : for (Method configurationMethod : confMethods) { - boolean hasMethod = false; - if(!Modifier.isPublic(configurationMethod.getModifiers()) ) { - continue; - } - // Find matching method in wrapper class and call it - mismatch: for (Method wrapperMethod : delegateMethods) { - if (configurationMethod.getName().equals(wrapperMethod.getName())) { - - // Get parameters for method - Class<?>[] wrapperMethodParams = wrapperMethod.getParameterTypes(); - Class<?>[] configMethodParams = configurationMethod.getParameterTypes(); - if(wrapperMethodParams.length != configMethodParams.length) { - System.err.println("Length"); - break mismatch; - } - for(int i = 0; i < wrapperMethodParams.length; i++) { - if(wrapperMethodParams[i] != configMethodParams[i]) { - break mismatch; - } - } - hasMethod = true; - break match; - } - } - assertTrue("Foo method '" + configurationMethod.getName() + "' has not been wrapped correctly in DelegatingConfiguration wrapper", hasMethod); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9bcbcf4a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 4450e94..eddb57c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -102,7 +102,8 @@ public abstract class KafkaTestBase extends TestLogger { flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); - flinkConfig.setString(ConfigConstants.METRICS_REPORTER_CLASS, JMXReporter.class.getName()); + flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter"); + flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); flink = new ForkableFlinkMiniCluster(flinkConfig, false); flink.start();
