This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a0ea5e9  [FLINK-16832][metrics] Refactor ReporterSetup
a0ea5e9 is described below

commit a0ea5e9980fee01d961a2f7c249e400325220a98
Author: Alexander Fedulov <[email protected]>
AuthorDate: Tue Mar 17 15:57:39 2020 +0100

    [FLINK-16832][metrics] Refactor ReporterSetup
    
    Introduce methods for better readibility.
---
 .../flink/runtime/metrics/ReporterSetup.java       | 76 +++++++++++++---------
 1 file changed, 46 insertions(+), 30 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index 40fb9dd..62cb543 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -127,23 +127,39 @@ public final class ReporterSetup {
 
        public static List<ReporterSetup> fromConfiguration(final Configuration 
configuration) {
                String includedReportersString = 
configuration.getString(MetricOptions.REPORTERS_LIST, "");
+
+               Set<String> namedReporters = 
findEnabledReportersInConfiguration(configuration, includedReportersString);
+
+               if (namedReporters.isEmpty()) {
+                       return Collections.emptyList();
+               }
+
+               final List<Tuple2<String, Configuration>> 
reporterConfigurations = loadReporterConfigurations(configuration, 
namedReporters);
+
+               final Map<String, MetricReporterFactory> reporterFactories = 
loadReporterFactories();
+
+               return setupReporters(reporterFactories, 
reporterConfigurations);
+       }
+
+       private static Set<String> 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
                Set<String> includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
                        .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
                        .collect(Collectors.toSet());
 
                // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
-               Set<String> namedReporters = new TreeSet<>(String::compareTo);
-               // scan entire configuration for "metric.reporter" keys and 
parse individual reporter configurations
+               Set<String> namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+               // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
                for (String key : configuration.keySet()) {
                        if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
                                Matcher matcher = 
reporterClassPattern.matcher(key);
                                if (matcher.matches()) {
                                        String reporterName = matcher.group(1);
                                        if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
-                                               if 
(namedReporters.contains(reporterName)) {
+                                               if 
(namedOrderedReporters.contains(reporterName)) {
                                                        LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
                                                } else {
-                                                       
namedReporters.add(reporterName);
+                                                       
namedOrderedReporters.add(reporterName);
                                                }
                                        } else {
                                                LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
@@ -151,12 +167,11 @@ public final class ReporterSetup {
                                }
                        }
                }
+               return namedOrderedReporters;
+       }
 
-               if (namedReporters.isEmpty()) {
-                       return Collections.emptyList();
-               }
-
-               List<Tuple2<String, Configuration>> reporterConfigurations = 
new ArrayList<>(namedReporters.size());
+       private static List<Tuple2<String, Configuration>> 
loadReporterConfigurations(Configuration configuration, Set<String> 
namedReporters) {
+               final List<Tuple2<String, Configuration>> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
 
                for (String namedReporter: namedReporters) {
                        DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
@@ -165,27 +180,7 @@ public final class ReporterSetup {
 
                        reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
                }
-
-               final Map<String, MetricReporterFactory> reporterFactories = 
loadReporterFactories();
-               List<ReporterSetup> reporterArguments = new 
ArrayList<>(reporterConfigurations.size());
-               for (Tuple2<String, Configuration> reporterConfiguration: 
reporterConfigurations) {
-                       String reporterName = reporterConfiguration.f0;
-                       Configuration reporterConfig = reporterConfiguration.f1;
-
-                       try {
-                               Optional<MetricReporter> metricReporterOptional 
= loadReporter(reporterName, reporterConfig, reporterFactories);
-                               metricReporterOptional.ifPresent(reporter -> {
-                                       MetricConfig metricConfig = new 
MetricConfig();
-                                       
reporterConfig.addAllToProperties(metricConfig);
-
-                                       
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
-                               });
-                       }
-                       catch (Throwable t) {
-                               LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
-                       }
-               }
-               return reporterArguments;
+               return reporterConfigurations;
        }
 
        private static Map<String, MetricReporterFactory> 
loadReporterFactories() {
@@ -207,6 +202,27 @@ public final class ReporterSetup {
                return Collections.unmodifiableMap(reporterFactories);
        }
 
+       private static List<ReporterSetup> setupReporters(Map<String, 
MetricReporterFactory> reporterFactories, List<Tuple2<String, Configuration>> 
reporterConfigurations) {
+               List<ReporterSetup> reporterSetups = new 
ArrayList<>(reporterConfigurations.size());
+               for (Tuple2<String, Configuration> reporterConfiguration: 
reporterConfigurations) {
+                       String reporterName = reporterConfiguration.f0;
+                       Configuration reporterConfig = reporterConfiguration.f1;
+
+                       try {
+                               Optional<MetricReporter> metricReporterOptional 
= loadReporter(reporterName, reporterConfig, reporterFactories);
+                               metricReporterOptional.ifPresent(reporter -> {
+                                       MetricConfig metricConfig = new 
MetricConfig();
+                                       
reporterConfig.addAllToProperties(metricConfig);
+                                       
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
+                               });
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
+                       }
+               }
+               return reporterSetups;
+       }
+
        private static Optional<MetricReporter> loadReporter(
                        final String reporterName,
                        final Configuration reporterConfig,

Reply via email to