C0urante commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1263968763
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##########
@@ -20,79 +20,114 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+@RunWith(Parameterized.class)
public class PluginScannerTest {
+ private enum ScannerType { Reflection, ServiceLoader };
+
@Rule
public TemporaryFolder pluginDir = new TemporaryFolder();
+ public PluginScanner scanner;
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> parameters() {
+ List<Object[]> values = new ArrayList<>();
+ for (ScannerType type : ScannerType.values()) {
+ values.add(new Object[]{type});
+ }
+ return values;
+ }
+
+ public PluginScannerTest(ScannerType scannerType) {
+ switch (scannerType) {
+ case Reflection:
+ this.scanner = new ReflectionScanner();
+ break;
+ case ServiceLoader:
+ this.scanner = new ServiceLoaderScanner();
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown type " +
scannerType);
+ }
+ }
+
@Test
- public void testLoadingUnloadedPluginClass() {
- DelegatingClassLoader classLoader = initClassLoader(
+ public void testScanningEmptyPluginPath() {
+ PluginScanResult result = scan(
Collections.emptyList()
);
- for (String pluginClassName : TestPlugins.pluginClasses()) {
- assertThrows(ClassNotFoundException.class, () ->
classLoader.loadClass(pluginClassName));
- }
+ assertTrue(result.isEmpty());
}
@Test
- public void testLoadingPluginClass() throws ClassNotFoundException {
- DelegatingClassLoader classLoader = initClassLoader(
+ public void testScanningPluginClasses() {
+ PluginScanResult result = scan(
TestPlugins.pluginPath()
);
+ Set<String> classes = new HashSet<>();
+ result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
for (String pluginClassName : TestPlugins.pluginClasses()) {
- assertNotNull(classLoader.loadClass(pluginClassName));
- assertNotNull(classLoader.pluginClassLoader(pluginClassName));
+ assertTrue("Expected " + pluginClassName + "to be discovered but
it was not",
+ classes.contains(pluginClassName));
Review Comment:
Of the approaches you've laid out, I'm -1 on 1 (the coverage provided by
these cases is important, especially if it leads to discoveries like this one)
and 2 (if a user runs into this it's going to be a massive headache to debug
since they'll see plugin loading errors for more than just the broken plugin).
I agree that it's a bit early to go about implementing our own service
loading logic, so although 3 is reasonable in theory, we can hold off on it for
now.
I don't love 4, since it's still a step backwards and killing workers on
startup is quite extreme. I think I could live with it if we believe there are
no better alternatives.
I did some digging and it seems like this is due to a known bug in the
OpenJDK `ServiceLoader`: https://bugs.openjdk.org/browse/JDK-8196182 (unclear
if this also affects other JDK distributions).
Given that we're dealing with buggy behavior in a popular JDK distribution,
it seems reasonable to try deal with this scenario gracefully. I prototyped a
local workaround for this that's a little hacky but does cause our tests to
pass. We can wrap calls to `Iterator::hasNext` in some error-handling logic:
```java
private boolean serviceLoaderHasNext(Iterator<?> serviceLoaderIterator) {
try {
return serviceLoaderIterator.hasNext();
} catch (LinkageError e1) {
log.error("Failed to scan for next service loaded plugin", e1);
try {
return serviceLoaderIterator.hasNext();
} catch (LinkageError e2) {
// It's difficult to know for sure if the iterator was able to
advance past the first broken
// plugin class, or if it will continue to fail on that broken
class for any subsequent calls
// to Iterator::hasNext or Iterator::next
// As a best-effort measure, we compare exception messages with
the assumption that
// they will include the name of the plugin class; if the
messages are the same, then we
// fail fast and throw the original linkage error; if the
messages differ, we try again
// For reference, see
https://bugs.openjdk.org/browse/JDK-8196182, which describes
// the behavior we are trying to mitigate with this logic as
buggy, but indicates that a fix
// in the JDK standard library ServiceLoader implementation is
unlikely to land
if (Objects.equals(e1.getMessage(), e2.getMessage())) {
throw e1;
} else {
log.error("Failed to scan for next service loaded plugin",
e2);
return true;
}
}
}
}
```
Which would then change our for-loop in
`PluginScanner::getServiceLoaderPluginDesc` to this:
```java
Iterator<T> iterator = serviceLoader.iterator();
while (serviceLoaderHasNext(iterator)) {
```
This essentially implements option 4, with some logic to try to intercept
linkage errors and handle them gracefully before falling back on terminating
the worker if it appears that our service loader is damaged beyond repair.
Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]