C0urante commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1263968763
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java: ########## @@ -20,79 +20,114 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +@RunWith(Parameterized.class) public class PluginScannerTest { + private enum ScannerType { Reflection, ServiceLoader }; + @Rule public TemporaryFolder pluginDir = new TemporaryFolder(); + public PluginScanner scanner; + + @Parameterized.Parameters + public static Collection<Object[]> parameters() { + List<Object[]> values = new ArrayList<>(); + for (ScannerType type : ScannerType.values()) { + values.add(new Object[]{type}); + } + return values; + } + + public PluginScannerTest(ScannerType scannerType) { + switch (scannerType) { + case Reflection: + this.scanner = new ReflectionScanner(); + break; + case ServiceLoader: + this.scanner = new ServiceLoaderScanner(); + break; + default: + throw new IllegalArgumentException("Unknown type " + scannerType); + } + } + @Test - public void testLoadingUnloadedPluginClass() { - DelegatingClassLoader classLoader = initClassLoader( + public void testScanningEmptyPluginPath() { + PluginScanResult result = scan( Collections.emptyList() ); - for (String pluginClassName : TestPlugins.pluginClasses()) { - assertThrows(ClassNotFoundException.class, () -> classLoader.loadClass(pluginClassName)); - } + assertTrue(result.isEmpty()); } @Test - public void testLoadingPluginClass() throws ClassNotFoundException { - DelegatingClassLoader classLoader = initClassLoader( + public void testScanningPluginClasses() { + PluginScanResult result = scan( TestPlugins.pluginPath() ); + Set<String> classes = new HashSet<>(); + result.forEach(pluginDesc -> classes.add(pluginDesc.className())); for (String pluginClassName : TestPlugins.pluginClasses()) { - assertNotNull(classLoader.loadClass(pluginClassName)); - assertNotNull(classLoader.pluginClassLoader(pluginClassName)); + assertTrue("Expected " + pluginClassName + "to be discovered but it was not", + classes.contains(pluginClassName)); Review Comment: Of the approaches you've laid out, I'm -1 on 1 (the coverage provided by these cases is important, especially if it leads to discoveries like this one) and 2 (if a user runs into this it's going to be a massive headache to debug since they'll see plugin loading errors for more than just the broken plugin). I agree that it's a bit early to go about implementing our own service loading logic, so although 3 is reasonable in theory, we can hold off on it for now. I don't love 4, since it's still a step backwards and killing workers on startup is quite extreme. I think I could live with it if we believe there are no better alternatives. I did some digging and it seems like this is due to a known bug in the OpenJDK `ServiceLoader`: https://bugs.openjdk.org/browse/JDK-8196182 (unclear if this also affects other JDK distributions). Given that we're dealing with buggy behavior in a popular JDK distribution, it seems reasonable to try deal with this scenario gracefully. I prototyped a local workaround for this that's a little hacky but does cause our tests to pass. We can wrap calls to `Iterator::hasNext` in some error-handling logic: ```java private boolean serviceLoaderHasNext(Iterator<?> serviceLoaderIterator) { try { return serviceLoaderIterator.hasNext(); } catch (LinkageError e1) { log.error("Failed to scan for next service loaded plugin", e1); try { return serviceLoaderIterator.hasNext(); } catch (LinkageError e2) { // It's difficult to know for sure if the iterator was able to advance past the first broken // plugin class, or if it will continue to fail on that broken class for any subsequent calls // to Iterator::hasNext or Iterator::next // As a best-effort measure, we compare exception messages with the assumption that // they will include the name of the plugin class; if the messages are the same, then we // fail fast and throw the original linkage error; if the messages differ, we try again // For reference, see https://bugs.openjdk.org/browse/JDK-8196182, which describes // the behavior we are trying to mitigate with this logic as buggy, but indicates that a fix // in the JDK standard library ServiceLoader implementation is unlikely to land if (Objects.equals(e1.getMessage(), e2.getMessage())) { throw e1; } else { log.error("Failed to scan for next service loaded plugin", e2); return true; } } } } ``` Which would then change our for-loop in `PluginScanner::getServiceLoaderPluginDesc` to this: ```java Iterator<T> iterator = serviceLoader.iterator(); while (serviceLoaderHasNext(iterator)) { ``` This essentially implements option 4, with some logic to try to intercept linkage errors and handle them gracefully before falling back on terminating the worker if it appears that our service loader is damaged beyond repair. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org