C0urante commented on code in PR #13771:
URL: https://github.com/apache/kafka/pull/13771#discussion_r1214691525


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java:
##########
@@ -490,6 +504,168 @@ public void testPluginUrlsWithRelativeSymlinkForwards() 
throws Exception {
         assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
     }
 
+    @Test
+    public void testNonCollidingAliases() {
+        SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
+        SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new 
TreeSet<>();
+        sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, 
MockSourceConnector.class.getClassLoader()));
+        SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        PluginScanResult result = new PluginScanResult(
+                sinkConnectors,
+                sourceConnectors,
+                converters,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> aliases = PluginUtils.computeAliases(result);
+        assertFalse(aliases.containsKey("Mock"));
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSinkConnector"));
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSink"));
+        assertEquals(MockSourceConnector.class.getName(), 
aliases.get("MockSourceConnector"));
+        assertEquals(MockSourceConnector.class.getName(), 
aliases.get("MockSource"));
+        assertEquals(CollidingConverter.class.getName(), 
aliases.get("CollidingConverter"));
+        assertEquals(CollidingConverter.class.getName(), 
aliases.get("Colliding"));
+    }
+
+
+    @Test
+    public void testMultiVersionAlias() {
+        SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
+        // distinct versions don't cause an alias collision (the class name is 
the same)
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", 
MockSinkConnector.class.getClassLoader()));
+        assertEquals(2, sinkConnectors.size());
+        PluginScanResult result = new PluginScanResult(
+                sinkConnectors,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> aliases = PluginUtils.computeAliases(result);
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSinkConnector"));
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSink"));
+    }
+
+    @Test
+    public void testCollidingPrunedAlias() {
+        SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        SortedSet<PluginDesc<HeaderConverter>> headerConverters = new 
TreeSet<>();
+        headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, 
null, CollidingHeaderConverter.class.getClassLoader()));
+        PluginScanResult result = new PluginScanResult(
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                converters,
+                headerConverters,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> aliases = PluginUtils.computeAliases(result);
+        assertEquals(CollidingConverter.class.getName(), 
aliases.get("CollidingConverter"));
+        assertEquals(CollidingHeaderConverter.class.getName(), 
aliases.get("CollidingHeaderConverter"));
+        assertFalse(aliases.containsKey("Colliding"));

Review Comment:
   Same thought RE comparing whole `Map` objects:
   
   ```suggestion
           Map<String, String> actualAliases = 
PluginUtils.computeAliases(result);
   
           Map<String, String> expectedAliases = new HashMap<>();
           expectedAliases.put("CollidingConverter", 
CollidingConverter.class.getName());
           expectedAliases.put("CollidingHeaderConverter", 
CollidingHeaderConverter.class.getName());
           assertEquals(expectedAliases, actualAliases);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java:
##########
@@ -490,6 +504,168 @@ public void testPluginUrlsWithRelativeSymlinkForwards() 
throws Exception {
         assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
     }
 
+    @Test
+    public void testNonCollidingAliases() {
+        SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
+        SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new 
TreeSet<>();
+        sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, 
MockSourceConnector.class.getClassLoader()));
+        SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        PluginScanResult result = new PluginScanResult(
+                sinkConnectors,
+                sourceConnectors,
+                converters,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> aliases = PluginUtils.computeAliases(result);
+        assertFalse(aliases.containsKey("Mock"));
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSinkConnector"));
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSink"));
+        assertEquals(MockSourceConnector.class.getName(), 
aliases.get("MockSourceConnector"));
+        assertEquals(MockSourceConnector.class.getName(), 
aliases.get("MockSource"));
+        assertEquals(CollidingConverter.class.getName(), 
aliases.get("CollidingConverter"));
+        assertEquals(CollidingConverter.class.getName(), 
aliases.get("Colliding"));

Review Comment:
   We can compare entire `Map` objects here, which gives better coverage and 
improves assertion error messages:
   ```suggestion
           Map<String, String> actualAliases = 
PluginUtils.computeAliases(result);
   
           Map<String, String> expectedAliases = new HashMap<>();
           expectedAliases.put("MockSinkConnector", 
MockSinkConnector.class.getName());
           expectedAliases.put("MockSink", MockSinkConnector.class.getName());
           expectedAliases.put("MockSourceConnector", 
MockSourceConnector.class.getName());
           expectedAliases.put("MockSource", 
MockSourceConnector.class.getName());
           expectedAliases.put("CollidingConverter", 
CollidingConverter.class.getName());
           expectedAliases.put("Colliding", CollidingConverter.class.getName());
           assertEquals(expectedAliases, actualAliases);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java:
##########
@@ -490,6 +504,168 @@ public void testPluginUrlsWithRelativeSymlinkForwards() 
throws Exception {
         assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
     }
 
+    @Test
+    public void testNonCollidingAliases() {
+        SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
+        SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new 
TreeSet<>();
+        sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, 
MockSourceConnector.class.getClassLoader()));
+        SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        PluginScanResult result = new PluginScanResult(
+                sinkConnectors,
+                sourceConnectors,
+                converters,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> aliases = PluginUtils.computeAliases(result);
+        assertFalse(aliases.containsKey("Mock"));
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSinkConnector"));
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSink"));
+        assertEquals(MockSourceConnector.class.getName(), 
aliases.get("MockSourceConnector"));
+        assertEquals(MockSourceConnector.class.getName(), 
aliases.get("MockSource"));
+        assertEquals(CollidingConverter.class.getName(), 
aliases.get("CollidingConverter"));
+        assertEquals(CollidingConverter.class.getName(), 
aliases.get("Colliding"));
+    }
+
+
+    @Test
+    public void testMultiVersionAlias() {
+        SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
+        // distinct versions don't cause an alias collision (the class name is 
the same)
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", 
MockSinkConnector.class.getClassLoader()));
+        assertEquals(2, sinkConnectors.size());
+        PluginScanResult result = new PluginScanResult(
+                sinkConnectors,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> aliases = PluginUtils.computeAliases(result);
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSinkConnector"));
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSink"));
+    }
+
+    @Test
+    public void testCollidingPrunedAlias() {
+        SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        SortedSet<PluginDesc<HeaderConverter>> headerConverters = new 
TreeSet<>();
+        headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, 
null, CollidingHeaderConverter.class.getClassLoader()));
+        PluginScanResult result = new PluginScanResult(
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                converters,
+                headerConverters,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> aliases = PluginUtils.computeAliases(result);
+        assertEquals(CollidingConverter.class.getName(), 
aliases.get("CollidingConverter"));
+        assertEquals(CollidingHeaderConverter.class.getName(), 
aliases.get("CollidingHeaderConverter"));
+        assertFalse(aliases.containsKey("Colliding"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCollidingSimpleAlias() {
+        SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        SortedSet<PluginDesc<Transformation<?>>> transformations = new 
TreeSet<>();
+        transformations.add(new PluginDesc<>((Class<? extends 
Transformation<?>>) (Class<?>) Colliding.class, null, 
Colliding.class.getClassLoader()));
+        PluginScanResult result = new PluginScanResult(
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                converters,
+                Collections.emptySortedSet(),
+                transformations,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> aliases = PluginUtils.computeAliases(result);
+        assertEquals(CollidingConverter.class.getName(), 
aliases.get("CollidingConverter"));
+        assertFalse(aliases.containsKey("Colliding"));

Review Comment:
   Same thought RE comparing whole `Map` objects:
   
   ```suggestion
           Map<String, String> actualAliases = 
PluginUtils.computeAliases(result);
   
           Map<String, String> expectedAliases = new HashMap<>();
           expectedAliases.put("CollidingConverter", 
CollidingConverter.class.getName());
           assertEquals(expectedAliases, actualAliases);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java:
##########
@@ -490,6 +504,168 @@ public void testPluginUrlsWithRelativeSymlinkForwards() 
throws Exception {
         assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
     }
 
+    @Test
+    public void testNonCollidingAliases() {
+        SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
+        SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new 
TreeSet<>();
+        sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, 
MockSourceConnector.class.getClassLoader()));
+        SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        PluginScanResult result = new PluginScanResult(
+                sinkConnectors,
+                sourceConnectors,
+                converters,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> aliases = PluginUtils.computeAliases(result);
+        assertFalse(aliases.containsKey("Mock"));
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSinkConnector"));
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSink"));
+        assertEquals(MockSourceConnector.class.getName(), 
aliases.get("MockSourceConnector"));
+        assertEquals(MockSourceConnector.class.getName(), 
aliases.get("MockSource"));
+        assertEquals(CollidingConverter.class.getName(), 
aliases.get("CollidingConverter"));
+        assertEquals(CollidingConverter.class.getName(), 
aliases.get("Colliding"));
+    }
+
+
+    @Test
+    public void testMultiVersionAlias() {
+        SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
+        // distinct versions don't cause an alias collision (the class name is 
the same)
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", 
MockSinkConnector.class.getClassLoader()));
+        assertEquals(2, sinkConnectors.size());
+        PluginScanResult result = new PluginScanResult(
+                sinkConnectors,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> aliases = PluginUtils.computeAliases(result);
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSinkConnector"));
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSink"));

Review Comment:
   Same thought RE comparing whole `Map` objects:
   
   ```suggestion
           Map<String, String> actualAliases = 
PluginUtils.computeAliases(result);
   
           Map<String, String> expectedAliases = new HashMap<>();
           expectedAliases.put("MockSinkConnector", 
MockSinkConnector.class.getName());
           expectedAliases.put("MockSink", MockSinkConnector.class.getName());
           assertEquals(expectedAliases, actualAliases);
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##########
@@ -386,6 +362,25 @@ private static String prunePluginName(PluginDesc<?> 
plugin, String suffix) {
         return simple;
     }
 
+    public static Map<String, String> computeAliases(PluginScanResult 
scanResult) {
+        Map<String, Set<String>> aliasCollisions = new HashMap<>();
+        scanResult.forEach(pluginDesc -> {
+            aliasCollisions.computeIfAbsent(simpleName(pluginDesc), ignored -> 
new HashSet<>()).add(pluginDesc.className());
+            aliasCollisions.computeIfAbsent(prunedName(pluginDesc), ignored -> 
new HashSet<>()).add(pluginDesc.className());
+        });
+        Map<String, String> aliases = new HashMap<>();
+        for (Map.Entry<String, Set<String>> entry : 
aliasCollisions.entrySet()) {
+            String alias = entry.getKey();
+            Set<String> classNames = entry.getValue();
+            if (classNames.size() == 1) {
+                aliases.put(alias, classNames.stream().findAny().get());
+            } else {
+                log.warn("Ambiguous alias '{}' refers to multiple distinct 
plugins {}: ignoring", alias, classNames);

Review Comment:
   After seeing an example of this log message, I'm wondering if the trailing 
`: ignoring` part here is really beneficial. Maybe something like this would be 
a bit clearer?
   
   ```suggestion
                   log.warn("Ignoring ambiguous alias '{}' since it refers to 
multiple distinct plugins {}", alias, classNames);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java:
##########
@@ -490,6 +504,168 @@ public void testPluginUrlsWithRelativeSymlinkForwards() 
throws Exception {
         assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
     }
 
+    @Test
+    public void testNonCollidingAliases() {
+        SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
+        SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new 
TreeSet<>();
+        sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, 
MockSourceConnector.class.getClassLoader()));
+        SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        PluginScanResult result = new PluginScanResult(
+                sinkConnectors,
+                sourceConnectors,
+                converters,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> aliases = PluginUtils.computeAliases(result);
+        assertFalse(aliases.containsKey("Mock"));
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSinkConnector"));
+        assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSink"));
+        assertEquals(MockSourceConnector.class.getName(), 
aliases.get("MockSourceConnector"));
+        assertEquals(MockSourceConnector.class.getName(), 
aliases.get("MockSource"));
+        assertEquals(CollidingConverter.class.getName(), 
aliases.get("CollidingConverter"));
+        assertEquals(CollidingConverter.class.getName(), 
aliases.get("Colliding"));
+    }
+
+

Review Comment:
   Nit: double empty line
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to