C0urante commented on code in PR #13771:
URL: https://github.com/apache/kafka/pull/13771#discussion_r1211965330


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java:
##########
@@ -68,45 +71,88 @@ public PluginScanResult(
                           connectorClientConfigPolicies);
     }
 
-    public Collection<PluginDesc<SinkConnector>> sinkConnectors() {
+    /**
+     * Merge one or more {@link PluginScanResult results} into one result 
object
+     */
+    public PluginScanResult(List<PluginScanResult> results) {
+        this(
+                merge(results, PluginScanResult::sinkConnectors),
+                merge(results, PluginScanResult::sourceConnectors),
+                merge(results, PluginScanResult::converters),
+                merge(results, PluginScanResult::headerConverters),
+                merge(results, PluginScanResult::transformations),
+                merge(results, PluginScanResult::predicates),
+                merge(results, PluginScanResult::configProviders),
+                merge(results, PluginScanResult::restExtensions),
+                merge(results, PluginScanResult::connectorClientConfigPolicies)
+        );
+    }
+
+    private static <T, R extends Comparable<R>> SortedSet<R> merge(List<T> 
results, Function<T, SortedSet<R>> accessor) {
+        SortedSet<R> merged = new TreeSet<>();
+        for (T element : results) {
+            merged.addAll(accessor.apply(element));
+        }
+        return merged;
+    }
+
+    public SortedSet<PluginDesc<SinkConnector>> sinkConnectors() {
         return sinkConnectors;
     }
 
-    public Collection<PluginDesc<SourceConnector>> sourceConnectors() {
+    public SortedSet<PluginDesc<SourceConnector>> sourceConnectors() {
         return sourceConnectors;
     }
 
-    public Collection<PluginDesc<Converter>> converters() {
+    public SortedSet<PluginDesc<Converter>> converters() {
         return converters;
     }
 
-    public Collection<PluginDesc<HeaderConverter>> headerConverters() {
+    public SortedSet<PluginDesc<HeaderConverter>> headerConverters() {
         return headerConverters;
     }
 
-    public Collection<PluginDesc<Transformation<?>>> transformations() {
+    public SortedSet<PluginDesc<Transformation<?>>> transformations() {
         return transformations;
     }
 
-    public Collection<PluginDesc<Predicate<?>>> predicates() {
+    public SortedSet<PluginDesc<Predicate<?>>> predicates() {
         return predicates;
     }
 
-    public Collection<PluginDesc<ConfigProvider>> configProviders() {
+    public SortedSet<PluginDesc<ConfigProvider>> configProviders() {
         return configProviders;
     }
 
-    public Collection<PluginDesc<ConnectRestExtension>> restExtensions() {
+    public SortedSet<PluginDesc<ConnectRestExtension>> restExtensions() {
         return restExtensions;
     }
 
-    public Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies() {
+    public SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies() {
         return connectorClientConfigPolicies;
     }
 
+    public void forEach(Consumer<PluginDesc<?>> consumer) {
+        forEach(sinkConnectors(), consumer);
+        forEach(sourceConnectors(), consumer);
+        forEach(converters(), consumer);
+        forEach(headerConverters(), consumer);
+        forEach(transformations(), consumer);
+        forEach(predicates(), consumer);
+        forEach(configProviders(), consumer);
+        forEach(restExtensions(), consumer);
+        forEach(connectorClientConfigPolicies(), consumer);
+    }
+
+    private static <T> void forEach(SortedSet<PluginDesc<T>> set, 
Consumer<PluginDesc<?>> consumer) {
+        for (PluginDesc<T> plugin : set) {
+            consumer.accept(plugin);
+        }
+    }

Review Comment:
   This can be simplified if we tweak the type signature of `allPlugins`:
   ```suggestion
       public void forEach(Consumer<PluginDesc<?>> consumer) {
           allPlugins.forEach(plugins -> plugins.forEach(consumer));
       }
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java:
##########
@@ -27,32 +27,35 @@
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 import java.util.Arrays;
-import java.util.Collection;
+import java.util.SortedSet;
 import java.util.List;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 public class PluginScanResult {
-    private final Collection<PluginDesc<SinkConnector>> sinkConnectors;
-    private final Collection<PluginDesc<SourceConnector>> sourceConnectors;
-    private final Collection<PluginDesc<Converter>> converters;
-    private final Collection<PluginDesc<HeaderConverter>> headerConverters;
-    private final Collection<PluginDesc<Transformation<?>>> transformations;
-    private final Collection<PluginDesc<Predicate<?>>> predicates;
-    private final Collection<PluginDesc<ConfigProvider>> configProviders;
-    private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
-    private final Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies;
-
-    private final List<Collection<?>> allPlugins;
+    private final SortedSet<PluginDesc<SinkConnector>> sinkConnectors;
+    private final SortedSet<PluginDesc<SourceConnector>> sourceConnectors;
+    private final SortedSet<PluginDesc<Converter>> converters;
+    private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
+    private final SortedSet<PluginDesc<Transformation<?>>> transformations;
+    private final SortedSet<PluginDesc<Predicate<?>>> predicates;
+    private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
+    private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
+    private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies;
+
+    private final List<SortedSet<?>> allPlugins;

Review Comment:
   We can be more specific here, which simplifies the 
`PluginScanResult::forEach` implementation below:
   ```suggestion
       private final List<SortedSet<? extends PluginDesc<?>>> allPlugins;
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java:
##########
@@ -83,6 +85,14 @@ public String location() {
         return location;
     }
 
+    public ClassLoader loader() {
+        return loader;
+    }
+
+    public boolean isolated() {
+        return loader instanceof PluginClassLoader;
+    }

Review Comment:
   This getter isn't used anywhere; can we remove it?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java:
##########
@@ -68,45 +71,88 @@ public PluginScanResult(
                           connectorClientConfigPolicies);
     }
 
-    public Collection<PluginDesc<SinkConnector>> sinkConnectors() {
+    /**
+     * Merge one or more {@link PluginScanResult results} into one result 
object
+     */
+    public PluginScanResult(List<PluginScanResult> results) {
+        this(
+                merge(results, PluginScanResult::sinkConnectors),
+                merge(results, PluginScanResult::sourceConnectors),
+                merge(results, PluginScanResult::converters),
+                merge(results, PluginScanResult::headerConverters),
+                merge(results, PluginScanResult::transformations),
+                merge(results, PluginScanResult::predicates),
+                merge(results, PluginScanResult::configProviders),
+                merge(results, PluginScanResult::restExtensions),
+                merge(results, PluginScanResult::connectorClientConfigPolicies)
+        );
+    }
+
+    private static <T, R extends Comparable<R>> SortedSet<R> merge(List<T> 
results, Function<T, SortedSet<R>> accessor) {
+        SortedSet<R> merged = new TreeSet<>();
+        for (T element : results) {
+            merged.addAll(accessor.apply(element));
+        }
+        return merged;
+    }

Review Comment:
   Since this is private and unlikely to be used elsewhere, we don't have to 
make it generic (which also makes it trickier to read):
   ```suggestion
       private static <R extends Comparable<R>> SortedSet<R> 
merge(List<PluginScanResult> results, Function<PluginScanResult, SortedSet<R>> 
pluginsAccessor) {
           SortedSet<R> merged = new TreeSet<>();
           for (PluginScanResult element : results) {
               merged.addAll(pluginsAccessor.apply(element));
           }
           return merged;
       }
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##########
@@ -356,25 +357,16 @@ public static String prunedName(PluginDesc<?> plugin) {
      * Verify whether a given plugin's alias matches another alias in a 
collection of plugins.
      *
      * @param alias the plugin descriptor to test for alias matching.
-     * @param plugins the collection of plugins to test against.
+     * @param aliases the collection of plugins to test against.
      * @param <U> the plugin type.
      * @return false if a match was found in the collection, otherwise true.
      */
     public static <U> boolean isAliasUnique(
             PluginDesc<U> alias,
-            Collection<PluginDesc<U>> plugins
+            Map<String, String> aliases
     ) {
-        boolean matched = false;
-        for (PluginDesc<U> plugin : plugins) {
-            if (simpleName(alias).equals(simpleName(plugin))
-                    || prunedName(alias).equals(prunedName(plugin))) {
-                if (matched) {
-                    return false;
-                }
-                matched = true;
-            }
-        }
-        return true;
+        // TODO: Mark alias collision and disable ambiguous aliases completely.

Review Comment:
   We shouldn't leave this in as a TODO; we should preserve the existing 
behavior where ambiguous aliases are completely disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to