[
https://issues.apache.org/jira/browse/FLINK-4246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15392154#comment-15392154
]
ASF GitHub Bot commented on FLINK-4246:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2285#discussion_r72091326
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
---
@@ -74,81 +76,97 @@ public MetricRegistry(Configuration config) {
this.delimiter = delim;
// second, instantiate any custom configured reporters
+ this.reporters = new ArrayList<>();
+
+ final String definedReporters =
config.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);
- final String className =
config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null);
- if (className == null) {
+ if (definedReporters == null) {
+ // no reporters defined
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics
will be exposed/reported.");
- this.reporter = null;
this.executor = null;
- }
- else {
- MetricReporter reporter;
- ScheduledExecutorService executor = null;
- try {
- String configuredPeriod =
config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null);
- TimeUnit timeunit = TimeUnit.SECONDS;
- long period = 10;
-
- if (configuredPeriod != null) {
- try {
- String[] interval =
configuredPeriod.split(" ");
- period =
Long.parseLong(interval[0]);
- timeunit =
TimeUnit.valueOf(interval[1]);
- }
- catch (Exception e) {
- LOG.error("Cannot parse report
interval from config: " + configuredPeriod +
- " - please use values
like '10 SECONDS' or '500 MILLISECONDS'. " +
- "Using default
reporting interval.");
- }
+ } else {
+ // we have some reporters so
+ String[] namedReporters = definedReporters.split(",");
+ for (String namedReporter : namedReporters) {
+
+ final String className =
+
config.getString(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + "."
+ ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
+ if (className == null) {
+ LOG.error("No reporter class set for
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
+ continue;
}
- MetricConfig reporterConfig =
createReporterConfig(config);
+ try {
+ String configuredPeriod =
+
config.getString(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + "."
+ ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
+ TimeUnit timeunit = TimeUnit.SECONDS;
+ long period = 10;
+
+ if (configuredPeriod != null) {
+ try {
+ String[] interval =
configuredPeriod.split(" ");
+ period =
Long.parseLong(interval[0]);
+ timeunit =
TimeUnit.valueOf(interval[1]);
+ }
+ catch (Exception e) {
+ LOG.error("Cannot parse
report interval from config: " + configuredPeriod +
+ " -
please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
+ "Using
default reporting interval.");
+ }
+ }
- Class<?> reporterClass =
Class.forName(className);
- reporter = (MetricReporter)
reporterClass.newInstance();
- reporter.open(reporterConfig);
+ Class<?> reporterClass =
Class.forName(className);
+ MetricReporter reporterInstance =
(MetricReporter) reporterClass.newInstance();
- if (reporter instanceof Scheduled) {
- executor =
Executors.newSingleThreadScheduledExecutor();
- LOG.info("Periodically reporting
metrics in intervals of {} {}", period, timeunit.name());
+ MetricConfig reporterConfig = new
MetricConfig();
+
config.addAll(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + ".",
reporterConfig);
+ reporterInstance.open(reporterConfig);
- executor.scheduleWithFixedDelay(new
ReporterTask((Scheduled) reporter), period, period, timeunit);
+ if (reporterInstance instanceof
Scheduled) {
+ if (this.executor == null) {
+ executor =
Executors.newSingleThreadScheduledExecutor();
+ }
+ LOG.info("Periodically
reporting metrics in intervals of {} {}", period, timeunit.name());
--- End diff --
Fixing, I'll include the configured reporter name as well as the class.
> Allow Specifying Multiple Metrics Reporters
> -------------------------------------------
>
> Key: FLINK-4246
> URL: https://issues.apache.org/jira/browse/FLINK-4246
> Project: Flink
> Issue Type: Improvement
> Components: Metrics
> Affects Versions: 1.1.0
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Fix For: 1.1.0
>
>
> We should allow specifying multiple reporters. A rough sketch of how the
> configuration should look like is this:
> {code}
> metrics.reporters = foo,bar
> metrics.reporter.foo.class = JMXReporter.class
> metrics.reporter.foo.port = 42-117
> metrics.reporter.bar.class = GangliaReporter.class
> metrics.reporter.bar.port = 512
> metrics.reporter.bar.whatever = 2
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)