This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1a7acefdfac5169e173e2b221fec7458049c3021 Author: Alexander Fedulov <[email protected]> AuthorDate: Tue Mar 17 17:56:48 2020 +0100 [FLINK-16222][metrics] Support loading reporters as plugins --- .../flink/runtime/metrics/ReporterSetup.java | 31 +++++++++++++++++----- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java index 4fdc68a..da15079 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java @@ -30,11 +30,14 @@ import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.MetricReporterFactory; import org.apache.flink.runtime.metrics.scope.ScopeFormat; +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -139,7 +142,7 @@ public final class ReporterSetup { final List<Tuple2<String, Configuration>> reporterConfigurations = loadReporterConfigurations(configuration, namedReporters); - final Map<String, MetricReporterFactory> reporterFactories = loadReporterFactories(); + final Map<String, MetricReporterFactory> reporterFactories = loadAvailableReporterFactories(pluginManager); return setupReporters(reporterFactories, reporterConfigurations); } @@ -186,17 +189,24 @@ public final class ReporterSetup { return reporterConfigurations; } - private static Map<String, MetricReporterFactory> loadReporterFactories() { - final ServiceLoader<MetricReporterFactory> serviceLoader = ServiceLoader.load(MetricReporterFactory.class); - + private static Map<String, MetricReporterFactory> loadAvailableReporterFactories(@Nullable PluginManager pluginManager) { final Map<String, MetricReporterFactory> reporterFactories = new HashMap<>(2); - final Iterator<MetricReporterFactory> factoryIterator = serviceLoader.iterator(); + final Iterator<MetricReporterFactory> factoryIterator = getAllReporterFactories(pluginManager); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services contains an entry to a non-existing factory class while (factoryIterator.hasNext()) { try { MetricReporterFactory factory = factoryIterator.next(); - reporterFactories.put(factory.getClass().getName(), factory); + String factoryClassName = factory.getClass().getName(); + MetricReporterFactory existingFactory = reporterFactories.get(factoryClassName); + if (existingFactory == null) { + reporterFactories.put(factoryClassName, factory); + LOG.debug("Found reporter factory {} at {} ", + factoryClassName, + new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath()); + } else { + LOG.warn("Multiple implementations of the same reporter were found in 'lib' and/or 'plugins' directories for {}. It is recommended to remove redundant reporter JARs to resolve used versions' ambiguity.", factoryClassName); + } } catch (Exception | ServiceConfigurationError e) { LOG.warn("Error while loading reporter factory.", e); } @@ -205,6 +215,15 @@ public final class ReporterSetup { return Collections.unmodifiableMap(reporterFactories); } + private static Iterator<MetricReporterFactory> getAllReporterFactories(@Nullable PluginManager pluginManager) { + final Iterator<MetricReporterFactory> factoryIteratorSPI = ServiceLoader.load(MetricReporterFactory.class).iterator(); + final Iterator<MetricReporterFactory> factoryIteratorPlugins = pluginManager != null + ? pluginManager.load(MetricReporterFactory.class) + : Collections.emptyIterator(); + + return Iterators.concat(factoryIteratorPlugins, factoryIteratorSPI); + } + private static List<ReporterSetup> setupReporters(Map<String, MetricReporterFactory> reporterFactories, List<Tuple2<String, Configuration>> reporterConfigurations) { List<ReporterSetup> reporterSetups = new ArrayList<>(reporterConfigurations.size()); for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
