[ 
https://issues.apache.org/jira/browse/FLINK-4246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15392137#comment-15392137
 ] 

ASF GitHub Bot commented on FLINK-4246:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2285#discussion_r72089377
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
    @@ -74,81 +76,97 @@ public MetricRegistry(Configuration config) {
                this.delimiter = delim;
     
                // second, instantiate any custom configured reporters
    +           this.reporters = new ArrayList<>();
    +
    +           final String definedReporters = 
config.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);
     
    -           final String className = 
config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null);
    -           if (className == null) {
    +           if (definedReporters == null) {
    +                   // no reporters defined
                        // by default, don't report anything
                        LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
    -                   this.reporter = null;
                        this.executor = null;
    -           }
    -           else {
    -                   MetricReporter reporter;
    -                   ScheduledExecutorService executor = null;
    -                   try {
    -                           String configuredPeriod = 
config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null);
    -                           TimeUnit timeunit = TimeUnit.SECONDS;
    -                           long period = 10;
    -
    -                           if (configuredPeriod != null) {
    -                                   try {
    -                                           String[] interval = 
configuredPeriod.split(" ");
    -                                           period = 
Long.parseLong(interval[0]);
    -                                           timeunit = 
TimeUnit.valueOf(interval[1]);
    -                                   }
    -                                   catch (Exception e) {
    -                                           LOG.error("Cannot parse report 
interval from config: " + configuredPeriod +
    -                                                   " - please use values 
like '10 SECONDS' or '500 MILLISECONDS'. " +
    -                                                   "Using default 
reporting interval.");
    -                                   }
    +           } else {
    +                   // we have some reporters so
    +                   String[] namedReporters = definedReporters.split(",");
    +                   for (String namedReporter : namedReporters) {
    +
    +                           final String className =
    +                                           
config.getString(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + "." 
+ ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
    +                           if (className == null) {
    +                                   LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
    +                                   continue;
                                }
     
    -                           MetricConfig reporterConfig = 
createReporterConfig(config);
    +                           try {
    +                                   String configuredPeriod =
    +                                                   
config.getString(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + "." 
+ ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
    +                                   TimeUnit timeunit = TimeUnit.SECONDS;
    +                                   long period = 10;
    +
    +                                   if (configuredPeriod != null) {
    +                                           try {
    +                                                   String[] interval = 
configuredPeriod.split(" ");
    +                                                   period = 
Long.parseLong(interval[0]);
    +                                                   timeunit = 
TimeUnit.valueOf(interval[1]);
    +                                           }
    +                                           catch (Exception e) {
    +                                                   LOG.error("Cannot parse 
report interval from config: " + configuredPeriod +
    +                                                                   " - 
please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
    +                                                                   "Using 
default reporting interval.");
    +                                           }
    +                                   }
     
    -                           Class<?> reporterClass = 
Class.forName(className);
    -                           reporter = (MetricReporter) 
reporterClass.newInstance();
    -                           reporter.open(reporterConfig);
    +                                   Class<?> reporterClass = 
Class.forName(className);
    +                                   MetricReporter reporterInstance = 
(MetricReporter) reporterClass.newInstance();
     
    -                           if (reporter instanceof Scheduled) {
    -                                   executor = 
Executors.newSingleThreadScheduledExecutor();
    -                                   LOG.info("Periodically reporting 
metrics in intervals of {} {}", period, timeunit.name());
    +                                   MetricConfig reporterConfig = new 
MetricConfig();
    +                                   
config.addAll(ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + ".", 
reporterConfig);
    +                                   reporterInstance.open(reporterConfig);
     
    -                                   executor.scheduleWithFixedDelay(new 
ReporterTask((Scheduled) reporter), period, period, timeunit);
    +                                   if (reporterInstance instanceof 
Scheduled) {
    +                                           if (this.executor == null) {
    +                                                   executor = 
Executors.newSingleThreadScheduledExecutor();
    +                                           }
    +                                           LOG.info("Periodically 
reporting metrics in intervals of {} {}", period, timeunit.name());
    --- End diff --
    
    this message should include the reporter name.


> Allow Specifying Multiple Metrics Reporters
> -------------------------------------------
>
>                 Key: FLINK-4246
>                 URL: https://issues.apache.org/jira/browse/FLINK-4246
>             Project: Flink
>          Issue Type: Improvement
>          Components: Metrics
>    Affects Versions: 1.1.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>             Fix For: 1.1.0
>
>
> We should allow specifying multiple reporters. A rough sketch of how the 
> configuration should look like is this:
> {code}
> metrics.reporters = foo,bar
> metrics.reporter.foo.class = JMXReporter.class
> metrics.reporter.foo.port = 42-117
> metrics.reporter.bar.class = GangliaReporter.class
> metrics.reporter.bar.port = 512
> metrics.reporter.bar.whatever = 2
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to