Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2285#discussion_r72091326
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
    @@ -74,81 +76,97 @@ public MetricRegistry(Configuration config) {
                this.delimiter = delim;
     
                // second, instantiate any custom configured reporters
    +           this.reporters = new ArrayList<>();
    +
    +           final String definedReporters = 
config.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);
     
    -           final String className = 
config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null);
    -           if (className == null) {
    +           if (definedReporters == null) {
    +                   // no reporters defined
                        // by default, don't report anything
                        LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
    -                   this.reporter = null;
                        this.executor = null;
    -           }
    -           else {
    -                   MetricReporter reporter;
    -                   ScheduledExecutorService executor = null;
    -                   try {
    -                           String configuredPeriod = 
config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null);
    -                           TimeUnit timeunit = TimeUnit.SECONDS;
    -                           long period = 10;
    -
    -                           if (configuredPeriod != null) {
    -                                   try {
    -                                           String[] interval = 
configuredPeriod.split(" ");
    -                                           period = 
Long.parseLong(interval[0]);
    -                                           timeunit = 
TimeUnit.valueOf(interval[1]);
    -                                   }
    -                                   catch (Exception e) {
    -                                           LOG.error("Cannot parse report 
interval from config: " + configuredPeriod +
    -                                                   " - please use values 
like '10 SECONDS' or '500 MILLISECONDS'. " +
    -                                                   "Using default 
reporting interval.");
    -                                   }
    +           } else {
    +                   // we have some reporters so
    +                   String[] namedReporters = definedReporters.split(",");
    +                   for (String namedReporter : namedReporters) {
    +
    +                           final String className =
    +                                           
config.getString(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + "." 
+ ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
    +                           if (className == null) {
    +                                   LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
    +                                   continue;
                                }
     
    -                           MetricConfig reporterConfig = 
createReporterConfig(config);
    +                           try {
    +                                   String configuredPeriod =
    +                                                   
config.getString(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + "." 
+ ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
    +                                   TimeUnit timeunit = TimeUnit.SECONDS;
    +                                   long period = 10;
    +
    +                                   if (configuredPeriod != null) {
    +                                           try {
    +                                                   String[] interval = 
configuredPeriod.split(" ");
    +                                                   period = 
Long.parseLong(interval[0]);
    +                                                   timeunit = 
TimeUnit.valueOf(interval[1]);
    +                                           }
    +                                           catch (Exception e) {
    +                                                   LOG.error("Cannot parse 
report interval from config: " + configuredPeriod +
    +                                                                   " - 
please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
    +                                                                   "Using 
default reporting interval.");
    +                                           }
    +                                   }
     
    -                           Class<?> reporterClass = 
Class.forName(className);
    -                           reporter = (MetricReporter) 
reporterClass.newInstance();
    -                           reporter.open(reporterConfig);
    +                                   Class<?> reporterClass = 
Class.forName(className);
    +                                   MetricReporter reporterInstance = 
(MetricReporter) reporterClass.newInstance();
     
    -                           if (reporter instanceof Scheduled) {
    -                                   executor = 
Executors.newSingleThreadScheduledExecutor();
    -                                   LOG.info("Periodically reporting 
metrics in intervals of {} {}", period, timeunit.name());
    +                                   MetricConfig reporterConfig = new 
MetricConfig();
    +                                   
config.addAll(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + ".", 
reporterConfig);
    +                                   reporterInstance.open(reporterConfig);
     
    -                                   executor.scheduleWithFixedDelay(new 
ReporterTask((Scheduled) reporter), period, period, timeunit);
    +                                   if (reporterInstance instanceof 
Scheduled) {
    +                                           if (this.executor == null) {
    +                                                   executor = 
Executors.newSingleThreadScheduledExecutor();
    +                                           }
    +                                           LOG.info("Periodically 
reporting metrics in intervals of {} {}", period, timeunit.name());
    --- End diff --
    
    Fixing, I'll include the configured reporter name as well as the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to