[
https://issues.apache.org/jira/browse/FLINK-4246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15389513#comment-15389513
]
ASF GitHub Bot commented on FLINK-4246:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2285#discussion_r71879969
--- Diff:
flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
@@ -75,77 +78,92 @@ public MetricRegistry(Configuration config) {
this.delimiter = delim;
// second, instantiate any custom configured reporters
-
- final String className =
config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null);
- if (className == null) {
+ this.reporters = new ArrayList<>();
+
+ final String definedReporters =
config.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);
+
+ if (definedReporters == null) {
+ // no reporters defined
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics
will be exposed/reported.");
- this.reporter = null;
this.executor = null;
- }
- else {
- MetricReporter reporter;
- ScheduledExecutorService executor = null;
- try {
- String configuredPeriod =
config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null);
- TimeUnit timeunit = TimeUnit.SECONDS;
- long period = 10;
-
- if (configuredPeriod != null) {
- try {
- String[] interval =
configuredPeriod.split(" ");
- period =
Long.parseLong(interval[0]);
- timeunit =
TimeUnit.valueOf(interval[1]);
+ } else {
+ // we have some reporters so
+ String[] namedReporters = definedReporters.split(",");
+ for (String namedReporter : namedReporters) {
+ DelegatingConfiguration reporterConfig =
+ new
DelegatingConfiguration(config, ConfigConstants.METRICS_REPORTER_PREFIX +
namedReporter + ".");
+
+ final String className =
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS, null);
+ if (className == null) {
+ throw new IllegalStateException("No
reporter class set for reporter " + namedReporter);
+ }
+
+ try {
+ String configuredPeriod =
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null);
+ TimeUnit timeunit = TimeUnit.SECONDS;
+ long period = 10;
+
+ if (configuredPeriod != null) {
+ try {
+ String[] interval =
configuredPeriod.split(" ");
+ period =
Long.parseLong(interval[0]);
+ timeunit =
TimeUnit.valueOf(interval[1]);
+ }
+ catch (Exception e) {
+ LOG.error("Cannot parse
report interval from config: " + configuredPeriod +
+ " -
please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
+ "Using
default reporting interval.");
+ }
}
- catch (Exception e) {
- LOG.error("Cannot parse report
interval from config: " + configuredPeriod +
- " - please use
values like '10 SECONDS' or '500 MILLISECONDS'. " +
- "Using default
reporting interval.");
+
+ Class<?> reporterClass =
Class.forName(className);
+ MetricReporter reporterInstance =
(MetricReporter) reporterClass.newInstance();
+ reporterInstance.open(reporterConfig);
+
+ if (reporterInstance instanceof
Scheduled) {
+ ensureExecutorRunning();
--- End diff --
do we really need this method? it is only used once.
> Allow Specifying Multiple Metrics Reporters
> -------------------------------------------
>
> Key: FLINK-4246
> URL: https://issues.apache.org/jira/browse/FLINK-4246
> Project: Flink
> Issue Type: Improvement
> Components: Metrics
> Affects Versions: 1.1.0
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Fix For: 1.1.0
>
>
> We should allow specifying multiple reporters. A rough sketch of how the
> configuration should look like is this:
> {code}
> metrics.reporters = foo,bar
> metrics.reporter.foo.class = JMXReporter.class
> metrics.reporter.foo.port = 42-117
> metrics.reporter.bar.class = GangliaReporter.class
> metrics.reporter.bar.port = 512
> metrics.reporter.bar.whatever = 2
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)