This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a7acefdfac5169e173e2b221fec7458049c3021
Author: Alexander Fedulov <[email protected]>
AuthorDate: Tue Mar 17 17:56:48 2020 +0100

    [FLINK-16222][metrics] Support loading reporters as plugins
---
 .../flink/runtime/metrics/ReporterSetup.java       | 31 +++++++++++++++++-----
 1 file changed, 25 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index 4fdc68a..da15079 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -30,11 +30,14 @@ import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -139,7 +142,7 @@ public final class ReporterSetup {
 
                final List<Tuple2<String, Configuration>> 
reporterConfigurations = loadReporterConfigurations(configuration, 
namedReporters);
 
-               final Map<String, MetricReporterFactory> reporterFactories = 
loadReporterFactories();
+               final Map<String, MetricReporterFactory> reporterFactories = 
loadAvailableReporterFactories(pluginManager);
 
                return setupReporters(reporterFactories, 
reporterConfigurations);
        }
@@ -186,17 +189,24 @@ public final class ReporterSetup {
                return reporterConfigurations;
        }
 
-       private static Map<String, MetricReporterFactory> 
loadReporterFactories() {
-               final ServiceLoader<MetricReporterFactory> serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
-
+       private static Map<String, MetricReporterFactory> 
loadAvailableReporterFactories(@Nullable PluginManager pluginManager) {
                final Map<String, MetricReporterFactory> reporterFactories = 
new HashMap<>(2);
-               final Iterator<MetricReporterFactory> factoryIterator = 
serviceLoader.iterator();
+               final Iterator<MetricReporterFactory> factoryIterator = 
getAllReporterFactories(pluginManager);
                // do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
                // such an error might be caused if the META-INF/services 
contains an entry to a non-existing factory class
                while (factoryIterator.hasNext()) {
                        try {
                                MetricReporterFactory factory = 
factoryIterator.next();
-                               
reporterFactories.put(factory.getClass().getName(), factory);
+                               String factoryClassName = 
factory.getClass().getName();
+                               MetricReporterFactory existingFactory = 
reporterFactories.get(factoryClassName);
+                               if (existingFactory == null) {
+                                       reporterFactories.put(factoryClassName, 
factory);
+                                       LOG.debug("Found reporter factory {} at 
{} ",
+                                               factoryClassName,
+                                               new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath());
+                               } else {
+                                       LOG.warn("Multiple implementations of 
the same reporter were found in 'lib' and/or 'plugins' directories for {}. It 
is recommended to remove redundant reporter JARs to resolve used versions' 
ambiguity.", factoryClassName);
+                               }
                        } catch (Exception | ServiceConfigurationError e) {
                                LOG.warn("Error while loading reporter 
factory.", e);
                        }
@@ -205,6 +215,15 @@ public final class ReporterSetup {
                return Collections.unmodifiableMap(reporterFactories);
        }
 
+       private static Iterator<MetricReporterFactory> 
getAllReporterFactories(@Nullable PluginManager pluginManager) {
+               final Iterator<MetricReporterFactory> factoryIteratorSPI = 
ServiceLoader.load(MetricReporterFactory.class).iterator();
+               final Iterator<MetricReporterFactory> factoryIteratorPlugins = 
pluginManager != null
+                       ? pluginManager.load(MetricReporterFactory.class)
+                       : Collections.emptyIterator();
+
+               return Iterators.concat(factoryIteratorPlugins, 
factoryIteratorSPI);
+       }
+
        private static List<ReporterSetup> setupReporters(Map<String, 
MetricReporterFactory> reporterFactories, List<Tuple2<String, Configuration>> 
reporterConfigurations) {
                List<ReporterSetup> reporterSetups = new 
ArrayList<>(reporterConfigurations.size());
                for (Tuple2<String, Configuration> reporterConfiguration: 
reporterConfigurations) {

Reply via email to