afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386646692
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##########
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
                                metricReporterOptional.ifPresent(reporter -> {
                                        MetricConfig metricConfig = new 
MetricConfig();
                                        
reporterConfig.addAllToProperties(metricConfig);
-
-                                       
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+                                       
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
                                });
-                       }
-                       catch (Throwable t) {
+                       } catch (Throwable t) {
                                LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
                        }
                }
-               return reporterArguments;
+               return reporterSetups;
        }
 
-       private static Map<String, MetricReporterFactory> 
loadReporterFactories() {
-               final ServiceLoader<MetricReporterFactory> serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+       private static List<Tuple2<String, Configuration>> 
loadReporterConfigurations(Configuration configuration, Set<String> 
namedReporters) {
+               final List<Tuple2<String, Configuration>> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+               for (String namedReporter: namedReporters) {
+                       DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+                               configuration,
+                               ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+                       reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+               }
+               return reporterConfigurations;
+       }
+
+       private static Set<String> 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+               Set<String> includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+                       .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+                       .collect(Collectors.toSet());
+
+               // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+               Set<String> namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+               // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+               for (String key : configuration.keySet()) {
+                       if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+                               Matcher matcher = 
reporterClassPattern.matcher(key);
+                               if (matcher.matches()) {
+                                       String reporterName = matcher.group(1);
+                                       if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+                                               if 
(namedOrderedReporters.contains(reporterName)) {
+                                                       LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+                                               } else {
+                                                       
namedOrderedReporters.add(reporterName);
+                                               }
+                                       } else {
+                                               LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+                                       }
+                               }
+                       }
+               }
+               return namedOrderedReporters;
+       }
+
+       private static Map<String, MetricReporterFactory> 
loadAvailableReporterFactories(PluginManager pluginManager) {
                final Map<String, MetricReporterFactory> reporterFactories = 
new HashMap<>(2);
-               final Iterator<MetricReporterFactory> factoryIterator = 
serviceLoader.iterator();
+               final Iterator<MetricReporterFactory> factoryIterator = 
getAllReporterFactories(pluginManager);
+               LOG.debug("All available factories (from both SPIs and 
Plugins):");
+               getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
                // do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
                // such an error might be caused if the META-INF/services 
contains an entry to a non-existing factory class
                while (factoryIterator.hasNext()) {
                        try {
                                MetricReporterFactory factory = 
factoryIterator.next();
-                               
reporterFactories.put(factory.getClass().getName(), factory);
+                               String factoryClassName = 
factory.getClass().getName();
+                               MetricReporterFactory existingFactory = 
reporterFactories.get(factoryClassName);
+                               if (existingFactory == null){
+                                       reporterFactories.put(factoryClassName, 
factory);
+                                       LOG.warn(new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation()
+                                               .toURI()).getCanonicalPath());
+                               } else {
+                                       //TODO: use path information below, 
when Plugin Classloader stops always prioritizing factories from /lib
+//                                     String jarPath1 = new 
File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation()
+//                                             .toURI()).getCanonicalPath();
+//                                     String jarPath2 = new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation()
+//                                             .toURI()).getCanonicalPath();
+//                                     LOG.warn("Multiple implementations of 
the same reporter were found: \n {} and \n{}", jarPath1, jarPath2);
+                                       LOG.warn("Multiple implementations of 
the same reporter were found in 'lib' and 'plugins' directories for {}. It is 
recommended to remove redundant reporter JARs to resolve used versions' 
ambiguity.", factoryClassName);
 
 Review comment:
   But this would technically be "multiple implementations of the same 
reporter"? The message does not tell explicitly that one of them is in lib and 
another in plugins - just that while searching those two directories, multiple 
implementations were found.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to