C0urante commented on code in PR #13771: URL: https://github.com/apache/kafka/pull/13771#discussion_r1214691525
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java: ########## @@ -490,6 +504,168 @@ public void testPluginUrlsWithRelativeSymlinkForwards() throws Exception { assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath)); } + @Test + public void testNonCollidingAliases() { + SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>(); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new TreeSet<>(); + sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, MockSourceConnector.class.getClassLoader())); + SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + sinkConnectors, + sourceConnectors, + converters, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> aliases = PluginUtils.computeAliases(result); + assertFalse(aliases.containsKey("Mock")); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSinkConnector")); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSink")); + assertEquals(MockSourceConnector.class.getName(), aliases.get("MockSourceConnector")); + assertEquals(MockSourceConnector.class.getName(), aliases.get("MockSource")); + assertEquals(CollidingConverter.class.getName(), aliases.get("CollidingConverter")); + assertEquals(CollidingConverter.class.getName(), aliases.get("Colliding")); + } + + + @Test + public void testMultiVersionAlias() { + SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>(); + // distinct versions don't cause an alias collision (the class name is the same) + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", MockSinkConnector.class.getClassLoader())); + assertEquals(2, sinkConnectors.size()); + PluginScanResult result = new PluginScanResult( + sinkConnectors, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> aliases = PluginUtils.computeAliases(result); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSinkConnector")); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSink")); + } + + @Test + public void testCollidingPrunedAlias() { + SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + SortedSet<PluginDesc<HeaderConverter>> headerConverters = new TreeSet<>(); + headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, null, CollidingHeaderConverter.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + Collections.emptySortedSet(), + Collections.emptySortedSet(), + converters, + headerConverters, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> aliases = PluginUtils.computeAliases(result); + assertEquals(CollidingConverter.class.getName(), aliases.get("CollidingConverter")); + assertEquals(CollidingHeaderConverter.class.getName(), aliases.get("CollidingHeaderConverter")); + assertFalse(aliases.containsKey("Colliding")); Review Comment: Same thought RE comparing whole `Map` objects: ```suggestion Map<String, String> actualAliases = PluginUtils.computeAliases(result); Map<String, String> expectedAliases = new HashMap<>(); expectedAliases.put("CollidingConverter", CollidingConverter.class.getName()); expectedAliases.put("CollidingHeaderConverter", CollidingHeaderConverter.class.getName()); assertEquals(expectedAliases, actualAliases); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java: ########## @@ -490,6 +504,168 @@ public void testPluginUrlsWithRelativeSymlinkForwards() throws Exception { assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath)); } + @Test + public void testNonCollidingAliases() { + SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>(); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new TreeSet<>(); + sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, MockSourceConnector.class.getClassLoader())); + SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + sinkConnectors, + sourceConnectors, + converters, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> aliases = PluginUtils.computeAliases(result); + assertFalse(aliases.containsKey("Mock")); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSinkConnector")); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSink")); + assertEquals(MockSourceConnector.class.getName(), aliases.get("MockSourceConnector")); + assertEquals(MockSourceConnector.class.getName(), aliases.get("MockSource")); + assertEquals(CollidingConverter.class.getName(), aliases.get("CollidingConverter")); + assertEquals(CollidingConverter.class.getName(), aliases.get("Colliding")); Review Comment: We can compare entire `Map` objects here, which gives better coverage and improves assertion error messages: ```suggestion Map<String, String> actualAliases = PluginUtils.computeAliases(result); Map<String, String> expectedAliases = new HashMap<>(); expectedAliases.put("MockSinkConnector", MockSinkConnector.class.getName()); expectedAliases.put("MockSink", MockSinkConnector.class.getName()); expectedAliases.put("MockSourceConnector", MockSourceConnector.class.getName()); expectedAliases.put("MockSource", MockSourceConnector.class.getName()); expectedAliases.put("CollidingConverter", CollidingConverter.class.getName()); expectedAliases.put("Colliding", CollidingConverter.class.getName()); assertEquals(expectedAliases, actualAliases); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java: ########## @@ -490,6 +504,168 @@ public void testPluginUrlsWithRelativeSymlinkForwards() throws Exception { assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath)); } + @Test + public void testNonCollidingAliases() { + SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>(); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new TreeSet<>(); + sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, MockSourceConnector.class.getClassLoader())); + SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + sinkConnectors, + sourceConnectors, + converters, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> aliases = PluginUtils.computeAliases(result); + assertFalse(aliases.containsKey("Mock")); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSinkConnector")); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSink")); + assertEquals(MockSourceConnector.class.getName(), aliases.get("MockSourceConnector")); + assertEquals(MockSourceConnector.class.getName(), aliases.get("MockSource")); + assertEquals(CollidingConverter.class.getName(), aliases.get("CollidingConverter")); + assertEquals(CollidingConverter.class.getName(), aliases.get("Colliding")); + } + + + @Test + public void testMultiVersionAlias() { + SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>(); + // distinct versions don't cause an alias collision (the class name is the same) + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", MockSinkConnector.class.getClassLoader())); + assertEquals(2, sinkConnectors.size()); + PluginScanResult result = new PluginScanResult( + sinkConnectors, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> aliases = PluginUtils.computeAliases(result); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSinkConnector")); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSink")); + } + + @Test + public void testCollidingPrunedAlias() { + SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + SortedSet<PluginDesc<HeaderConverter>> headerConverters = new TreeSet<>(); + headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, null, CollidingHeaderConverter.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + Collections.emptySortedSet(), + Collections.emptySortedSet(), + converters, + headerConverters, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> aliases = PluginUtils.computeAliases(result); + assertEquals(CollidingConverter.class.getName(), aliases.get("CollidingConverter")); + assertEquals(CollidingHeaderConverter.class.getName(), aliases.get("CollidingHeaderConverter")); + assertFalse(aliases.containsKey("Colliding")); + } + + @SuppressWarnings("unchecked") + @Test + public void testCollidingSimpleAlias() { + SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + SortedSet<PluginDesc<Transformation<?>>> transformations = new TreeSet<>(); + transformations.add(new PluginDesc<>((Class<? extends Transformation<?>>) (Class<?>) Colliding.class, null, Colliding.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + Collections.emptySortedSet(), + Collections.emptySortedSet(), + converters, + Collections.emptySortedSet(), + transformations, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> aliases = PluginUtils.computeAliases(result); + assertEquals(CollidingConverter.class.getName(), aliases.get("CollidingConverter")); + assertFalse(aliases.containsKey("Colliding")); Review Comment: Same thought RE comparing whole `Map` objects: ```suggestion Map<String, String> actualAliases = PluginUtils.computeAliases(result); Map<String, String> expectedAliases = new HashMap<>(); expectedAliases.put("CollidingConverter", CollidingConverter.class.getName()); assertEquals(expectedAliases, actualAliases); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java: ########## @@ -490,6 +504,168 @@ public void testPluginUrlsWithRelativeSymlinkForwards() throws Exception { assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath)); } + @Test + public void testNonCollidingAliases() { + SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>(); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new TreeSet<>(); + sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, MockSourceConnector.class.getClassLoader())); + SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + sinkConnectors, + sourceConnectors, + converters, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> aliases = PluginUtils.computeAliases(result); + assertFalse(aliases.containsKey("Mock")); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSinkConnector")); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSink")); + assertEquals(MockSourceConnector.class.getName(), aliases.get("MockSourceConnector")); + assertEquals(MockSourceConnector.class.getName(), aliases.get("MockSource")); + assertEquals(CollidingConverter.class.getName(), aliases.get("CollidingConverter")); + assertEquals(CollidingConverter.class.getName(), aliases.get("Colliding")); + } + + + @Test + public void testMultiVersionAlias() { + SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>(); + // distinct versions don't cause an alias collision (the class name is the same) + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", MockSinkConnector.class.getClassLoader())); + assertEquals(2, sinkConnectors.size()); + PluginScanResult result = new PluginScanResult( + sinkConnectors, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> aliases = PluginUtils.computeAliases(result); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSinkConnector")); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSink")); Review Comment: Same thought RE comparing whole `Map` objects: ```suggestion Map<String, String> actualAliases = PluginUtils.computeAliases(result); Map<String, String> expectedAliases = new HashMap<>(); expectedAliases.put("MockSinkConnector", MockSinkConnector.class.getName()); expectedAliases.put("MockSink", MockSinkConnector.class.getName()); assertEquals(expectedAliases, actualAliases); ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java: ########## @@ -386,6 +362,25 @@ private static String prunePluginName(PluginDesc<?> plugin, String suffix) { return simple; } + public static Map<String, String> computeAliases(PluginScanResult scanResult) { + Map<String, Set<String>> aliasCollisions = new HashMap<>(); + scanResult.forEach(pluginDesc -> { + aliasCollisions.computeIfAbsent(simpleName(pluginDesc), ignored -> new HashSet<>()).add(pluginDesc.className()); + aliasCollisions.computeIfAbsent(prunedName(pluginDesc), ignored -> new HashSet<>()).add(pluginDesc.className()); + }); + Map<String, String> aliases = new HashMap<>(); + for (Map.Entry<String, Set<String>> entry : aliasCollisions.entrySet()) { + String alias = entry.getKey(); + Set<String> classNames = entry.getValue(); + if (classNames.size() == 1) { + aliases.put(alias, classNames.stream().findAny().get()); + } else { + log.warn("Ambiguous alias '{}' refers to multiple distinct plugins {}: ignoring", alias, classNames); Review Comment: After seeing an example of this log message, I'm wondering if the trailing `: ignoring` part here is really beneficial. Maybe something like this would be a bit clearer? ```suggestion log.warn("Ignoring ambiguous alias '{}' since it refers to multiple distinct plugins {}", alias, classNames); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java: ########## @@ -490,6 +504,168 @@ public void testPluginUrlsWithRelativeSymlinkForwards() throws Exception { assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath)); } + @Test + public void testNonCollidingAliases() { + SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>(); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new TreeSet<>(); + sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, MockSourceConnector.class.getClassLoader())); + SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + sinkConnectors, + sourceConnectors, + converters, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> aliases = PluginUtils.computeAliases(result); + assertFalse(aliases.containsKey("Mock")); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSinkConnector")); + assertEquals(MockSinkConnector.class.getName(), aliases.get("MockSink")); + assertEquals(MockSourceConnector.class.getName(), aliases.get("MockSourceConnector")); + assertEquals(MockSourceConnector.class.getName(), aliases.get("MockSource")); + assertEquals(CollidingConverter.class.getName(), aliases.get("CollidingConverter")); + assertEquals(CollidingConverter.class.getName(), aliases.get("Colliding")); + } + + Review Comment: Nit: double empty line ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org