This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 22ab975fe29 KAFKA-14649: Isolate failures during plugin path scanning 
to single plugin classes (#13182)
22ab975fe29 is described below

commit 22ab975fe2985c6ff9f0c77d931075b8ba7750d6
Author: Greg Harris <[email protected]>
AuthorDate: Thu Mar 2 12:10:01 2023 -0800

    KAFKA-14649: Isolate failures during plugin path scanning to single plugin 
classes (#13182)
    
    Reviewers: Christo Lolov <[email protected]>, Chris Egerton 
<[email protected]>
---
 .../runtime/isolation/DelegatingClassLoader.java   | 56 +++++++++++++----
 .../connect/runtime/isolation/PluginsTest.java     | 70 +++++++++++++++++++++
 .../connect/runtime/isolation/TestPlugins.java     | 63 +++++++++++++++++--
 ....apache.kafka.connect.rest.ConnectRestExtension | 16 +++++
 .../test/plugins/CoLocatedPlugin.java              | 46 ++++++++++++++
 .../DefaultConstructorPrivateConnector.java        | 67 ++++++++++++++++++++
 .../plugins/DefaultConstructorThrowsConnector.java | 68 +++++++++++++++++++++
 .../test/plugins/MissingSuperclassConverter.java   | 46 ++++++++++++++
 .../plugins/NoDefaultConstructorConnector.java     | 67 ++++++++++++++++++++
 .../test/plugins/NonExistentInterface.java         | 24 ++++++++
 .../bad-packaging/test/plugins/OuterClass.java     | 66 ++++++++++++++++++++
 .../plugins/StaticInitializerThrowsConnector.java  | 71 ++++++++++++++++++++++
 .../StaticInitializerThrowsRestExtension.java      | 64 +++++++++++++++++++
 .../test/plugins/VersionMethodThrowsConnector.java | 63 +++++++++++++++++++
 14 files changed, 770 insertions(+), 17 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 7dbd6112f7a..df6006f197f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -54,6 +55,7 @@ import java.util.Collections;
 import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.List;
+import java.util.ServiceConfigurationError;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.SortedMap;
@@ -265,13 +267,11 @@ public class DelegatingClassLoader extends URLClassLoader 
{
             log.error("Invalid path in plugin path: {}. Ignoring.", path, e);
         } catch (IOException e) {
             log.error("Could not get listing for plugin path: {}. Ignoring.", 
path, e);
-        } catch (ReflectiveOperationException e) {
-            log.error("Could not instantiate plugins in: {}. Ignoring.", path, 
e);
         }
     }
 
     private void registerPlugin(Path pluginLocation)
-        throws IOException, ReflectiveOperationException {
+        throws IOException {
         log.info("Loading plugin from: {}", pluginLocation);
         List<URL> pluginUrls = new ArrayList<>();
         for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
@@ -292,7 +292,7 @@ public class DelegatingClassLoader extends URLClassLoader {
     private void scanUrlsAndAddPlugins(
             ClassLoader loader,
             URL[] urls
-    ) throws ReflectiveOperationException {
+    ) {
         PluginScanResult plugins = scanPluginPath(loader, urls);
         log.info("Registered loader: {}", loader);
         if (!plugins.isEmpty()) {
@@ -352,7 +352,7 @@ public class DelegatingClassLoader extends URLClassLoader {
     private PluginScanResult scanPluginPath(
             ClassLoader loader,
             URL[] urls
-    ) throws ReflectiveOperationException {
+    ) {
         ConfigurationBuilder builder = new ConfigurationBuilder();
         builder.setClassLoaders(new ClassLoader[]{loader});
         builder.addUrls(urls);
@@ -374,12 +374,12 @@ public class DelegatingClassLoader extends URLClassLoader 
{
     }
 
     @SuppressWarnings({"unchecked"})
-    private Collection<PluginDesc<Predicate<?>>> 
getPredicatePluginDesc(ClassLoader loader, Reflections reflections) throws 
ReflectiveOperationException {
+    private Collection<PluginDesc<Predicate<?>>> 
getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
         return (Collection<PluginDesc<Predicate<?>>>) (Collection<?>) 
getPluginDesc(reflections, Predicate.class, loader);
     }
 
     @SuppressWarnings({"unchecked"})
-    private Collection<PluginDesc<Transformation<?>>> 
getTransformationPluginDesc(ClassLoader loader, Reflections reflections) throws 
ReflectiveOperationException {
+    private Collection<PluginDesc<Transformation<?>>> 
getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
         return (Collection<PluginDesc<Transformation<?>>>) (Collection<?>) 
getPluginDesc(reflections, Transformation.class, loader);
     }
 
@@ -387,7 +387,7 @@ public class DelegatingClassLoader extends URLClassLoader {
             Reflections reflections,
             Class<T> klass,
             ClassLoader loader
-    ) throws ReflectiveOperationException {
+    ) {
         Set<Class<? extends T>> plugins;
         try {
             plugins = reflections.getSubTypesOf(klass);
@@ -400,7 +400,11 @@ public class DelegatingClassLoader extends URLClassLoader {
         Collection<PluginDesc<T>> result = new ArrayList<>();
         for (Class<? extends T> plugin : plugins) {
             if (PluginUtils.isConcrete(plugin)) {
-                result.add(pluginDesc(plugin, versionFor(plugin), loader));
+                try {
+                    result.add(pluginDesc(plugin, versionFor(plugin), loader));
+                } catch (ReflectiveOperationException | LinkageError e) {
+                    log.error("Failed to discover {}: Unable to instantiate 
{}{}", klass.getSimpleName(), plugin.getSimpleName(), 
reflectiveErrorDescription(e), e);
+                }
             } else {
                 log.debug("Skipping {} as it is not concrete implementation", 
plugin);
             }
@@ -419,7 +423,14 @@ public class DelegatingClassLoader extends URLClassLoader {
         Collection<PluginDesc<T>> result = new ArrayList<>();
         try {
             ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
-            for (T pluginImpl : serviceLoader) {
+            for (Iterator<T> iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
+                T pluginImpl;
+                try {
+                    pluginImpl = iterator.next();
+                } catch (ServiceConfigurationError t) {
+                    log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+                    continue;
+                }
                 result.add(pluginDesc((Class<? extends T>) 
pluginImpl.getClass(),
                     versionFor(pluginImpl), loader));
             }
@@ -430,7 +441,14 @@ public class DelegatingClassLoader extends URLClassLoader {
     }
 
     private static <T>  String versionFor(T pluginImpl) {
-        return pluginImpl instanceof Versioned ? ((Versioned) 
pluginImpl).version() : UNDEFINED_VERSION;
+        try {
+            if (pluginImpl instanceof Versioned) {
+                return ((Versioned) pluginImpl).version();
+            }
+        } catch (Throwable t) {
+            log.error("Failed to get plugin version for " + 
pluginImpl.getClass(), t);
+        }
+        return UNDEFINED_VERSION;
     }
 
     public static <T> String versionFor(Class<? extends T> pluginKlass) throws 
ReflectiveOperationException {
@@ -439,6 +457,22 @@ public class DelegatingClassLoader extends URLClassLoader {
             versionFor(pluginKlass.getDeclaredConstructor().newInstance()) : 
UNDEFINED_VERSION;
     }
 
+    private static String reflectiveErrorDescription(Throwable t) {
+        if (t instanceof NoSuchMethodException) {
+            return ": Plugin class must have a no-args constructor, and cannot 
be a non-static inner class";
+        } else if (t instanceof SecurityException) {
+            return ": Security settings must allow reflective instantiation of 
plugin classes";
+        } else if (t instanceof IllegalAccessException) {
+            return ": Plugin class default constructor must be public";
+        } else if (t instanceof ExceptionInInitializerError) {
+            return ": Failed to statically initialize plugin class";
+        } else if (t instanceof InvocationTargetException) {
+            return ": Failed to invoke plugin constructor";
+        } else {
+            return "";
+        }
+    }
+
     @Override
     protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
         String fullName = aliases.getOrDefault(name, name);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 4b040de92c4..1cfbee97bff 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -186,6 +186,76 @@ public class PluginsTest {
         ));
     }
 
+    @Test
+    public void shouldFindCoLocatedPluginIfBadPackaging() {
+        Converter converter = plugins.newPlugin(
+                TestPlugin.BAD_PACKAGING_CO_LOCATED.className(),
+                new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
+                Converter.class
+        );
+        assertNotNull(converter);
+    }
+
+    @Test
+    public void shouldThrowIfPluginMissingSuperclass() {
+        assertThrows(ConnectException.class, () -> plugins.newPlugin(
+                TestPlugin.BAD_PACKAGING_MISSING_SUPERCLASS.className(),
+                new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
+                Converter.class
+        ));
+    }
+
+    @Test
+    public void shouldThrowIfStaticInitializerThrows() {
+        assertThrows(ConnectException.class, () -> plugins.newConnector(
+                
TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_CONNECTOR.className()
+        ));
+    }
+
+    @Test
+    public void shouldThrowIfStaticInitializerThrowsServiceLoader() {
+        assertThrows(ConnectException.class, () -> plugins.newPlugin(
+                
TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION.className(),
+                new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
+                ConnectRestExtension.class
+        ));
+    }
+
+    @Test
+    public void shouldThrowIfDefaultConstructorThrows() {
+        assertThrows(ConnectException.class, () -> plugins.newConnector(
+                
TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR.className()
+        ));
+    }
+
+    @Test
+    public void shouldThrowIfDefaultConstructorPrivate() {
+        assertThrows(ConnectException.class, () -> plugins.newConnector(
+                
TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_PRIVATE_CONNECTOR.className()
+        ));
+    }
+
+    @Test
+    public void shouldThrowIfNoDefaultConstructor() {
+        assertThrows(ConnectException.class, () -> plugins.newConnector(
+                
TestPlugin.BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_CONNECTOR.className()
+        ));
+    }
+
+    @Test
+    public void shouldNotThrowIfVersionMethodThrows() {
+        assertNotNull(plugins.newConnector(
+                
TestPlugin.BAD_PACKAGING_VERSION_METHOD_THROWS_CONNECTOR.className()
+        ));
+    }
+
+    @Test
+    public void shouldThrowIfPluginInnerClass() {
+        assertThrows(ConnectException.class, () -> plugins.newConnector(
+                TestPlugin.BAD_PACKAGING_INNER_CLASS_CONNECTOR.className()
+        ));
+    }
+
     @Test
     public void shouldShareStaticValuesBetweenSamePlugin() {
         // Plugins are not isolated from other instances of their own class.
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
index f828b316749..1b018f1194a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Predicate;
 import java.util.jar.Attributes;
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
@@ -42,6 +43,8 @@ import java.util.stream.Collectors;
 import javax.tools.JavaCompiler;
 import javax.tools.StandardJavaFileManager;
 import javax.tools.ToolProvider;
+
+import org.apache.kafka.connect.components.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +64,7 @@ import org.slf4j.LoggerFactory;
  * and reference the names of the different plugins directly via the {@link 
TestPlugin} enum.
  */
 public class TestPlugins {
+    private static final Predicate<String> REMOVE_CLASS_FILTER = s -> 
s.contains("NonExistentInterface");
     public enum TestPlugin {
         /**
          * A plugin which will always throw an exception during loading
@@ -111,20 +115,62 @@ public class TestPlugins {
         /**
          * A plugin which shares a jar file with {@link 
TestPlugin#MULTIPLE_PLUGINS_IN_JAR_THING_ONE}
          */
-        MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar", 
"test.plugins.ThingTwo");
+        MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar", 
"test.plugins.ThingTwo"),
+        /**
+         * A plugin which is incorrectly packaged, and is missing a superclass 
definition.
+         */
+        BAD_PACKAGING_MISSING_SUPERCLASS("bad-packaging", 
"test.plugins.MissingSuperclass", false, REMOVE_CLASS_FILTER),
+        /**
+         * A plugin which is packaged with other incorrectly packaged plugins, 
but itself has no issues loading.
+         */
+        BAD_PACKAGING_CO_LOCATED("bad-packaging", 
"test.plugins.CoLocatedPlugin", true, REMOVE_CLASS_FILTER),
+        /**
+         * A connector which is incorrectly packaged, and throws during static 
initialization.
+         */
+        BAD_PACKAGING_STATIC_INITIALIZER_THROWS_CONNECTOR("bad-packaging", 
"test.plugins.StaticInitializerThrowsConnector", false, REMOVE_CLASS_FILTER),
+        /**
+         * A plugin which is incorrectly packaged, which throws an exception 
from the {@link Versioned#version()} method.
+         */
+        BAD_PACKAGING_VERSION_METHOD_THROWS_CONNECTOR("bad-packaging", 
"test.plugins.VersionMethodThrowsConnector", false, REMOVE_CLASS_FILTER),
+        /**
+         * A plugin which is incorrectly packaged, which throws an exception 
from default constructor.
+         */
+        BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR("bad-packaging", 
"test.plugins.DefaultConstructorThrowsConnector", false, REMOVE_CLASS_FILTER),
+        /**
+         * A plugin which is incorrectly packaged, which has a private default 
constructor.
+         */
+        BAD_PACKAGING_DEFAULT_CONSTRUCTOR_PRIVATE_CONNECTOR("bad-packaging", 
"test.plugins.DefaultConstructorPrivateConnector", false, REMOVE_CLASS_FILTER),
+        /**
+         * A plugin which is incorrectly packaged, which has a private default 
constructor.
+         */
+        BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_CONNECTOR("bad-packaging", 
"test.plugins.NoDefaultConstructorConnector", false, REMOVE_CLASS_FILTER),
+        /**
+         * A plugin which is incorrectly packaged, which throws an exception 
from the {@link Versioned#version()} method.
+         */
+        BAD_PACKAGING_INNER_CLASS_CONNECTOR("bad-packaging", 
"test.plugins.OuterClass$InnerClass", false, REMOVE_CLASS_FILTER),
+        /**
+         * A plugin which is incorrectly packaged, which throws an exception 
from the {@link Versioned#version()} method.
+         */
+        
BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION("bad-packaging", 
"test.plugins.StaticInitializerThrowsRestExtension", false, 
REMOVE_CLASS_FILTER);
 
         private final String resourceDir;
         private final String className;
         private final boolean includeByDefault;
+        private final Predicate<String> removeRuntimeClasses;
 
         TestPlugin(String resourceDir, String className) {
             this(resourceDir, className, true);
         }
 
         TestPlugin(String resourceDir, String className, boolean 
includeByDefault) {
+            this(resourceDir, className, includeByDefault, ignored -> false);
+        }
+
+        TestPlugin(String resourceDir, String className, boolean 
includeByDefault, Predicate<String> removeRuntimeClasses) {
             this.resourceDir = resourceDir;
             this.className = className;
             this.includeByDefault = includeByDefault;
+            this.removeRuntimeClasses = removeRuntimeClasses;
         }
 
         public String resourceDir() {
@@ -138,6 +184,10 @@ public class TestPlugins {
         public boolean includeByDefault() {
             return includeByDefault;
         }
+
+        public Predicate<String> removeRuntimeClasses() {
+            return removeRuntimeClasses;
+        }
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(TestPlugins.class);
@@ -152,7 +202,7 @@ public class TestPlugins {
                 if (pluginJars.containsKey(testPlugin.resourceDir())) {
                     log.debug("Skipping recompilation of " + 
testPlugin.resourceDir());
                 }
-                pluginJars.put(testPlugin.resourceDir(), 
createPluginJar(testPlugin.resourceDir()));
+                pluginJars.put(testPlugin.resourceDir(), 
createPluginJar(testPlugin.resourceDir(), testPlugin.removeRuntimeClasses()));
             }
         } catch (Throwable e) {
             log.error("Could not set up plugin test jars", e);
@@ -228,14 +278,14 @@ public class TestPlugins {
                 .toArray(TestPlugin[]::new);
     }
 
-    private static File createPluginJar(String resourceDir) throws IOException 
{
+    private static File createPluginJar(String resourceDir, Predicate<String> 
removeRuntimeClasses) throws IOException {
         Path inputDir = resourceDirectoryPath("test-plugins/" + resourceDir);
         Path binDir = Files.createTempDirectory(resourceDir + ".bin.");
         compileJavaSources(inputDir, binDir);
         File jarFile = Files.createTempFile(resourceDir + ".", 
".jar").toFile();
         try (JarOutputStream jar = openJarFile(jarFile)) {
-            writeJar(jar, inputDir);
-            writeJar(jar, binDir);
+            writeJar(jar, inputDir, removeRuntimeClasses);
+            writeJar(jar, binDir, removeRuntimeClasses);
         }
         removeDirectory(binDir);
         jarFile.deleteOnExit();
@@ -315,10 +365,11 @@ public class TestPlugins {
         }
     }
 
-    private static void writeJar(JarOutputStream jar, Path inputDir) throws 
IOException {
+    private static void writeJar(JarOutputStream jar, Path inputDir, 
Predicate<String> removeRuntimeClasses) throws IOException {
         List<Path> paths = Files.walk(inputDir)
             .filter(Files::isRegularFile)
             .filter(path -> !path.toFile().getName().endsWith(".java"))
+            .filter(path -> 
!removeRuntimeClasses.test(path.toFile().getName()))
             .collect(Collectors.toList());
         for (Path path : paths) {
             try (InputStream in = new BufferedInputStream(new 
FileInputStream(path.toFile()))) {
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
new file mode 100644
index 00000000000..8b0f4881486
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.StaticInitializerThrowsRestExtension
\ No newline at end of file
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/CoLocatedPlugin.java
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/CoLocatedPlugin.java
new file mode 100644
index 00000000000..c166e630e0a
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/CoLocatedPlugin.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Map;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>This is a plugin co-located with other poorly packaged plugins, but 
should be visible despite other errors.
+ */
+public class CoLocatedPlugin implements Converter {
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+
+    }
+
+    @Override
+    public byte[] fromConnectData(final String topic, final Schema schema, 
final Object value) {
+        return new byte[0];
+    }
+
+    @Override
+    public SchemaAndValue toConnectData(final String topic, final byte[] 
value) {
+        return null;
+    }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorPrivateConnector.java
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorPrivateConnector.java
new file mode 100644
index 00000000000..107c4af8214
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorPrivateConnector.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Mark the default constructor as private.
+ */
+public class DefaultConstructorPrivateConnector extends SinkConnector {
+
+
+    private DefaultConstructorPrivateConnector() {
+    }
+
+    @Override
+    public String version() {
+        return null;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return null;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        return null;
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public ConfigDef config() {
+        return null;
+    }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConnector.java
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConnector.java
new file mode 100644
index 00000000000..77e194f9805
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConnector.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Unconditionally throw an exception during the default constructor.
+ */
+public class DefaultConstructorThrowsConnector extends SinkConnector {
+
+
+    public DefaultConstructorThrowsConnector() {
+        throw new RuntimeException("I always throw an exception");
+    }
+
+    @Override
+    public String version() {
+        return null;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return null;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        return null;
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public ConfigDef config() {
+        return null;
+    }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/MissingSuperclassConverter.java
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/MissingSuperclassConverter.java
new file mode 100644
index 00000000000..a7fb10f3897
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/MissingSuperclassConverter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Map;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Implement a non-existent interface that is not packaged at runtime.
+ */
+public class MissingSuperclassConverter implements Converter, 
NonExistentInterface {
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+
+    }
+
+    @Override
+    public byte[] fromConnectData(final String topic, final Schema schema, 
final Object value) {
+        return new byte[0];
+    }
+
+    @Override
+    public SchemaAndValue toConnectData(final String topic, final byte[] 
value) {
+        return null;
+    }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConnector.java
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConnector.java
new file mode 100644
index 00000000000..fe6b2da2a0c
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConnector.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>This class has no default constructor
+ */
+public class NoDefaultConstructorConnector extends SinkConnector {
+
+
+    public NoDefaultConstructorConnector(int ignored) {
+    }
+
+    @Override
+    public String version() {
+        return null;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return null;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        return null;
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public ConfigDef config() {
+        return null;
+    }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NonExistentInterface.java
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NonExistentInterface.java
new file mode 100644
index 00000000000..6ec9de969d6
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NonExistentInterface.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+/**
+ * An interface which is present for compilation but removed during packaging.
+ */
+public interface NonExistentInterface {
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/OuterClass.java
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/OuterClass.java
new file mode 100644
index 00000000000..e76d3a7de79
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/OuterClass.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Defines a connector as a non-static inner class, which does not have a 
default constructor.
+ */
+public class OuterClass {
+
+    public class InnerClass extends SinkConnector {
+        @Override
+        public String version() {
+            throw new RuntimeException("I always throw an exception");
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+
+        }
+
+        @Override
+        public Class<? extends Task> taskClass() {
+            return null;
+        }
+
+        @Override
+        public List<Map<String, String>> taskConfigs(int maxTasks) {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+
+        }
+
+        @Override
+        public ConfigDef config() {
+            return null;
+        }
+    }
+
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsConnector.java
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsConnector.java
new file mode 100644
index 00000000000..945b78fbb95
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsConnector.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Unconditionally throw an exception during static initialization.
+ */
+public class StaticInitializerThrowsConnector extends SinkConnector {
+
+    static {
+        setup();
+    }
+
+    public static void setup() {
+        throw new RuntimeException("I always throw an exception");
+    }
+
+    @Override
+    public String version() {
+        return null;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return null;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        return null;
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public ConfigDef config() {
+        return null;
+    }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsRestExtension.java
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsRestExtension.java
new file mode 100644
index 00000000000..8ef0bdc10e9
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsRestExtension.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Unconditionally throw an exception during static initialization.
+ */
+public class StaticInitializerThrowsRestExtension implements 
ConnectRestExtension {
+
+    static {
+        setup();
+    }
+
+    public static void setup() {
+        throw new RuntimeException("I always throw an exception");
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+
+    }
+
+    @Override
+    public String version() {
+        return null;
+    }
+
+    @Override
+    public void register(ConnectRestExtensionContext restPluginContext) {
+
+    }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/VersionMethodThrowsConnector.java
 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/VersionMethodThrowsConnector.java
new file mode 100644
index 00000000000..b178697c901
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/VersionMethodThrowsConnector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Unconditionally throw an exception during the version method.
+ */
+public class VersionMethodThrowsConnector extends SinkConnector {
+
+    @Override
+    public String version() {
+        throw new RuntimeException("I always throw an exception");
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return null;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        return null;
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public ConfigDef config() {
+        return null;
+    }
+}


Reply via email to