C0urante commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1263968763


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##########
@@ -20,79 +20,114 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class PluginScannerTest {
 
+    private enum ScannerType { Reflection, ServiceLoader };
+
     @Rule
     public TemporaryFolder pluginDir = new TemporaryFolder();
 
+    public PluginScanner scanner;
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> parameters() {
+        List<Object[]> values = new ArrayList<>();
+        for (ScannerType type : ScannerType.values()) {
+            values.add(new Object[]{type});
+        }
+        return values;
+    }
+
+    public PluginScannerTest(ScannerType scannerType) {
+        switch (scannerType) {
+            case Reflection:
+                this.scanner = new ReflectionScanner();
+                break;
+            case ServiceLoader:
+                this.scanner = new ServiceLoaderScanner();
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown type " + 
scannerType);
+        }
+    }
+
     @Test
-    public void testLoadingUnloadedPluginClass() {
-        DelegatingClassLoader classLoader = initClassLoader(
+    public void testScanningEmptyPluginPath() {
+        PluginScanResult result = scan(
                 Collections.emptyList()
         );
-        for (String pluginClassName : TestPlugins.pluginClasses()) {
-            assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
-        }
+        assertTrue(result.isEmpty());
     }
 
     @Test
-    public void testLoadingPluginClass() throws ClassNotFoundException {
-        DelegatingClassLoader classLoader = initClassLoader(
+    public void testScanningPluginClasses() {
+        PluginScanResult result = scan(
                 TestPlugins.pluginPath()
         );
+        Set<String> classes = new HashSet<>();
+        result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
         for (String pluginClassName : TestPlugins.pluginClasses()) {
-            assertNotNull(classLoader.loadClass(pluginClassName));
-            assertNotNull(classLoader.pluginClassLoader(pluginClassName));
+            assertTrue("Expected " + pluginClassName + "to be discovered but 
it was not",
+                    classes.contains(pluginClassName));

Review Comment:
   Of the approaches you've laid out, I'm -1 on 1 (the coverage provided by 
these cases is important, especially if it leads to discoveries like this one) 
and 2 (if a user runs into this it's going to be a massive headache to debug 
since they'll see plugin loading errors for more than just the broken plugin).
   
   I agree that it's a bit early to go about implementing our own service 
loading logic, so although 3 is reasonable in theory, we can hold off on it for 
now.
   
   I don't love 4, since it's still a step backwards and killing workers on 
startup is quite extreme. I think I could live with it if we believe there are 
no better alternatives.
   
   I did some digging and it seems like this is due to a known bug in the 
OpenJDK `ServiceLoader`: https://bugs.openjdk.org/browse/JDK-8196182 (unclear 
if this also affects other JDK distributions).
   
   Given that we're dealing with buggy behavior in a popular JDK distribution, 
it seems reasonable to try deal with this scenario gracefully. I prototyped a 
local workaround for this that's a little hacky but does cause our tests to 
pass. We can wrap calls to `Iterator::hasNext` in some error-handling logic:
   
   ```java
   private boolean serviceLoaderHasNext(Iterator<?> serviceLoaderIterator) {
       try {
           return serviceLoaderIterator.hasNext();
       } catch (LinkageError e1) {
           log.error("Failed to scan for next service loaded plugin", e1);
           try {
               return serviceLoaderIterator.hasNext();
           } catch (LinkageError e2) {
               // It's difficult to know for sure if the iterator was able to 
advance past the first broken
               // plugin class, or if it will continue to fail on that broken 
class for any subsequent calls
               // to Iterator::hasNext or Iterator::next
               // As a best-effort measure, we compare exception messages with 
the assumption that
               // they will include the name of the plugin class; if the 
messages are the same, then we
               // fail fast and throw the original linkage error; if the 
messages differ, we try again
               // For reference, see 
https://bugs.openjdk.org/browse/JDK-8196182, which describes
               // the behavior we are trying to mitigate with this logic as 
buggy, but indicates that a fix
               // in the JDK standard library ServiceLoader implementation is 
unlikely to land
               if (Objects.equals(e1.getMessage(), e2.getMessage())) {
                   throw e1;
               } else {
                   log.error("Failed to scan for next service loaded plugin", 
e2);
                   return true;
               }
           }
       }
   }
   ```
   
   Which would then change our for-loop in 
`PluginScanner::getServiceLoaderPluginDesc` to this:
   
   ```java
   Iterator<T> iterator = serviceLoader.iterator();
   while (serviceLoaderHasNext(iterator)) {
   ```
   
   This essentially implements option 4, with some logic to try to intercept 
linkage errors and handle them gracefully before falling back on terminating 
the worker if it appears that our service loader is damaged beyond repair.
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to