Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2285#discussion_r72089377
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
---
@@ -74,81 +76,97 @@ public MetricRegistry(Configuration config) {
this.delimiter = delim;
// second, instantiate any custom configured reporters
+ this.reporters = new ArrayList<>();
+
+ final String definedReporters =
config.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);
- final String className =
config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null);
- if (className == null) {
+ if (definedReporters == null) {
+ // no reporters defined
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics
will be exposed/reported.");
- this.reporter = null;
this.executor = null;
- }
- else {
- MetricReporter reporter;
- ScheduledExecutorService executor = null;
- try {
- String configuredPeriod =
config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null);
- TimeUnit timeunit = TimeUnit.SECONDS;
- long period = 10;
-
- if (configuredPeriod != null) {
- try {
- String[] interval =
configuredPeriod.split(" ");
- period =
Long.parseLong(interval[0]);
- timeunit =
TimeUnit.valueOf(interval[1]);
- }
- catch (Exception e) {
- LOG.error("Cannot parse report
interval from config: " + configuredPeriod +
- " - please use values
like '10 SECONDS' or '500 MILLISECONDS'. " +
- "Using default
reporting interval.");
- }
+ } else {
+ // we have some reporters so
+ String[] namedReporters = definedReporters.split(",");
+ for (String namedReporter : namedReporters) {
+
+ final String className =
+
config.getString(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + "."
+ ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
+ if (className == null) {
+ LOG.error("No reporter class set for
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
+ continue;
}
- MetricConfig reporterConfig =
createReporterConfig(config);
+ try {
+ String configuredPeriod =
+
config.getString(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + "."
+ ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
+ TimeUnit timeunit = TimeUnit.SECONDS;
+ long period = 10;
+
+ if (configuredPeriod != null) {
+ try {
+ String[] interval =
configuredPeriod.split(" ");
+ period =
Long.parseLong(interval[0]);
+ timeunit =
TimeUnit.valueOf(interval[1]);
+ }
+ catch (Exception e) {
+ LOG.error("Cannot parse
report interval from config: " + configuredPeriod +
+ " -
please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
+ "Using
default reporting interval.");
+ }
+ }
- Class<?> reporterClass =
Class.forName(className);
- reporter = (MetricReporter)
reporterClass.newInstance();
- reporter.open(reporterConfig);
+ Class<?> reporterClass =
Class.forName(className);
+ MetricReporter reporterInstance =
(MetricReporter) reporterClass.newInstance();
- if (reporter instanceof Scheduled) {
- executor =
Executors.newSingleThreadScheduledExecutor();
- LOG.info("Periodically reporting
metrics in intervals of {} {}", period, timeunit.name());
+ MetricConfig reporterConfig = new
MetricConfig();
+
config.addAll(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + ".",
reporterConfig);
+ reporterInstance.open(reporterConfig);
- executor.scheduleWithFixedDelay(new
ReporterTask((Scheduled) reporter), period, period, timeunit);
+ if (reporterInstance instanceof
Scheduled) {
+ if (this.executor == null) {
+ executor =
Executors.newSingleThreadScheduledExecutor();
+ }
+ LOG.info("Periodically
reporting metrics in intervals of {} {}", period, timeunit.name());
--- End diff --
this message should include the reporter name.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---