This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 22ab975fe29 KAFKA-14649: Isolate failures during plugin path scanning
to single plugin classes (#13182)
22ab975fe29 is described below
commit 22ab975fe2985c6ff9f0c77d931075b8ba7750d6
Author: Greg Harris <[email protected]>
AuthorDate: Thu Mar 2 12:10:01 2023 -0800
KAFKA-14649: Isolate failures during plugin path scanning to single plugin
classes (#13182)
Reviewers: Christo Lolov <[email protected]>, Chris Egerton
<[email protected]>
---
.../runtime/isolation/DelegatingClassLoader.java | 56 +++++++++++++----
.../connect/runtime/isolation/PluginsTest.java | 70 +++++++++++++++++++++
.../connect/runtime/isolation/TestPlugins.java | 63 +++++++++++++++++--
....apache.kafka.connect.rest.ConnectRestExtension | 16 +++++
.../test/plugins/CoLocatedPlugin.java | 46 ++++++++++++++
.../DefaultConstructorPrivateConnector.java | 67 ++++++++++++++++++++
.../plugins/DefaultConstructorThrowsConnector.java | 68 +++++++++++++++++++++
.../test/plugins/MissingSuperclassConverter.java | 46 ++++++++++++++
.../plugins/NoDefaultConstructorConnector.java | 67 ++++++++++++++++++++
.../test/plugins/NonExistentInterface.java | 24 ++++++++
.../bad-packaging/test/plugins/OuterClass.java | 66 ++++++++++++++++++++
.../plugins/StaticInitializerThrowsConnector.java | 71 ++++++++++++++++++++++
.../StaticInitializerThrowsRestExtension.java | 64 +++++++++++++++++++
.../test/plugins/VersionMethodThrowsConnector.java | 63 +++++++++++++++++++
14 files changed, 770 insertions(+), 17 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 7dbd6112f7a..df6006f197f 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
@@ -54,6 +55,7 @@ import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
+import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.SortedMap;
@@ -265,13 +267,11 @@ public class DelegatingClassLoader extends URLClassLoader
{
log.error("Invalid path in plugin path: {}. Ignoring.", path, e);
} catch (IOException e) {
log.error("Could not get listing for plugin path: {}. Ignoring.",
path, e);
- } catch (ReflectiveOperationException e) {
- log.error("Could not instantiate plugins in: {}. Ignoring.", path,
e);
}
}
private void registerPlugin(Path pluginLocation)
- throws IOException, ReflectiveOperationException {
+ throws IOException {
log.info("Loading plugin from: {}", pluginLocation);
List<URL> pluginUrls = new ArrayList<>();
for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
@@ -292,7 +292,7 @@ public class DelegatingClassLoader extends URLClassLoader {
private void scanUrlsAndAddPlugins(
ClassLoader loader,
URL[] urls
- ) throws ReflectiveOperationException {
+ ) {
PluginScanResult plugins = scanPluginPath(loader, urls);
log.info("Registered loader: {}", loader);
if (!plugins.isEmpty()) {
@@ -352,7 +352,7 @@ public class DelegatingClassLoader extends URLClassLoader {
private PluginScanResult scanPluginPath(
ClassLoader loader,
URL[] urls
- ) throws ReflectiveOperationException {
+ ) {
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setClassLoaders(new ClassLoader[]{loader});
builder.addUrls(urls);
@@ -374,12 +374,12 @@ public class DelegatingClassLoader extends URLClassLoader
{
}
@SuppressWarnings({"unchecked"})
- private Collection<PluginDesc<Predicate<?>>>
getPredicatePluginDesc(ClassLoader loader, Reflections reflections) throws
ReflectiveOperationException {
+ private Collection<PluginDesc<Predicate<?>>>
getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
return (Collection<PluginDesc<Predicate<?>>>) (Collection<?>)
getPluginDesc(reflections, Predicate.class, loader);
}
@SuppressWarnings({"unchecked"})
- private Collection<PluginDesc<Transformation<?>>>
getTransformationPluginDesc(ClassLoader loader, Reflections reflections) throws
ReflectiveOperationException {
+ private Collection<PluginDesc<Transformation<?>>>
getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
return (Collection<PluginDesc<Transformation<?>>>) (Collection<?>)
getPluginDesc(reflections, Transformation.class, loader);
}
@@ -387,7 +387,7 @@ public class DelegatingClassLoader extends URLClassLoader {
Reflections reflections,
Class<T> klass,
ClassLoader loader
- ) throws ReflectiveOperationException {
+ ) {
Set<Class<? extends T>> plugins;
try {
plugins = reflections.getSubTypesOf(klass);
@@ -400,7 +400,11 @@ public class DelegatingClassLoader extends URLClassLoader {
Collection<PluginDesc<T>> result = new ArrayList<>();
for (Class<? extends T> plugin : plugins) {
if (PluginUtils.isConcrete(plugin)) {
- result.add(pluginDesc(plugin, versionFor(plugin), loader));
+ try {
+ result.add(pluginDesc(plugin, versionFor(plugin), loader));
+ } catch (ReflectiveOperationException | LinkageError e) {
+ log.error("Failed to discover {}: Unable to instantiate
{}{}", klass.getSimpleName(), plugin.getSimpleName(),
reflectiveErrorDescription(e), e);
+ }
} else {
log.debug("Skipping {} as it is not concrete implementation",
plugin);
}
@@ -419,7 +423,14 @@ public class DelegatingClassLoader extends URLClassLoader {
Collection<PluginDesc<T>> result = new ArrayList<>();
try {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
- for (T pluginImpl : serviceLoader) {
+ for (Iterator<T> iterator = serviceLoader.iterator();
iterator.hasNext(); ) {
+ T pluginImpl;
+ try {
+ pluginImpl = iterator.next();
+ } catch (ServiceConfigurationError t) {
+ log.error("Failed to discover {}{}",
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+ continue;
+ }
result.add(pluginDesc((Class<? extends T>)
pluginImpl.getClass(),
versionFor(pluginImpl), loader));
}
@@ -430,7 +441,14 @@ public class DelegatingClassLoader extends URLClassLoader {
}
private static <T> String versionFor(T pluginImpl) {
- return pluginImpl instanceof Versioned ? ((Versioned)
pluginImpl).version() : UNDEFINED_VERSION;
+ try {
+ if (pluginImpl instanceof Versioned) {
+ return ((Versioned) pluginImpl).version();
+ }
+ } catch (Throwable t) {
+ log.error("Failed to get plugin version for " +
pluginImpl.getClass(), t);
+ }
+ return UNDEFINED_VERSION;
}
public static <T> String versionFor(Class<? extends T> pluginKlass) throws
ReflectiveOperationException {
@@ -439,6 +457,22 @@ public class DelegatingClassLoader extends URLClassLoader {
versionFor(pluginKlass.getDeclaredConstructor().newInstance()) :
UNDEFINED_VERSION;
}
+ private static String reflectiveErrorDescription(Throwable t) {
+ if (t instanceof NoSuchMethodException) {
+ return ": Plugin class must have a no-args constructor, and cannot
be a non-static inner class";
+ } else if (t instanceof SecurityException) {
+ return ": Security settings must allow reflective instantiation of
plugin classes";
+ } else if (t instanceof IllegalAccessException) {
+ return ": Plugin class default constructor must be public";
+ } else if (t instanceof ExceptionInInitializerError) {
+ return ": Failed to statically initialize plugin class";
+ } else if (t instanceof InvocationTargetException) {
+ return ": Failed to invoke plugin constructor";
+ } else {
+ return "";
+ }
+ }
+
@Override
protected Class<?> loadClass(String name, boolean resolve) throws
ClassNotFoundException {
String fullName = aliases.getOrDefault(name, name);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 4b040de92c4..1cfbee97bff 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -186,6 +186,76 @@ public class PluginsTest {
));
}
+ @Test
+ public void shouldFindCoLocatedPluginIfBadPackaging() {
+ Converter converter = plugins.newPlugin(
+ TestPlugin.BAD_PACKAGING_CO_LOCATED.className(),
+ new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
+ Converter.class
+ );
+ assertNotNull(converter);
+ }
+
+ @Test
+ public void shouldThrowIfPluginMissingSuperclass() {
+ assertThrows(ConnectException.class, () -> plugins.newPlugin(
+ TestPlugin.BAD_PACKAGING_MISSING_SUPERCLASS.className(),
+ new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
+ Converter.class
+ ));
+ }
+
+ @Test
+ public void shouldThrowIfStaticInitializerThrows() {
+ assertThrows(ConnectException.class, () -> plugins.newConnector(
+
TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_CONNECTOR.className()
+ ));
+ }
+
+ @Test
+ public void shouldThrowIfStaticInitializerThrowsServiceLoader() {
+ assertThrows(ConnectException.class, () -> plugins.newPlugin(
+
TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION.className(),
+ new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
+ ConnectRestExtension.class
+ ));
+ }
+
+ @Test
+ public void shouldThrowIfDefaultConstructorThrows() {
+ assertThrows(ConnectException.class, () -> plugins.newConnector(
+
TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR.className()
+ ));
+ }
+
+ @Test
+ public void shouldThrowIfDefaultConstructorPrivate() {
+ assertThrows(ConnectException.class, () -> plugins.newConnector(
+
TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_PRIVATE_CONNECTOR.className()
+ ));
+ }
+
+ @Test
+ public void shouldThrowIfNoDefaultConstructor() {
+ assertThrows(ConnectException.class, () -> plugins.newConnector(
+
TestPlugin.BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_CONNECTOR.className()
+ ));
+ }
+
+ @Test
+ public void shouldNotThrowIfVersionMethodThrows() {
+ assertNotNull(plugins.newConnector(
+
TestPlugin.BAD_PACKAGING_VERSION_METHOD_THROWS_CONNECTOR.className()
+ ));
+ }
+
+ @Test
+ public void shouldThrowIfPluginInnerClass() {
+ assertThrows(ConnectException.class, () -> plugins.newConnector(
+ TestPlugin.BAD_PACKAGING_INNER_CLASS_CONNECTOR.className()
+ ));
+ }
+
@Test
public void shouldShareStaticValuesBetweenSamePlugin() {
// Plugins are not isolated from other instances of their own class.
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
index f828b316749..1b018f1194a 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.function.Predicate;
import java.util.jar.Attributes;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
@@ -42,6 +43,8 @@ import java.util.stream.Collectors;
import javax.tools.JavaCompiler;
import javax.tools.StandardJavaFileManager;
import javax.tools.ToolProvider;
+
+import org.apache.kafka.connect.components.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +64,7 @@ import org.slf4j.LoggerFactory;
* and reference the names of the different plugins directly via the {@link
TestPlugin} enum.
*/
public class TestPlugins {
+ private static final Predicate<String> REMOVE_CLASS_FILTER = s ->
s.contains("NonExistentInterface");
public enum TestPlugin {
/**
* A plugin which will always throw an exception during loading
@@ -111,20 +115,62 @@ public class TestPlugins {
/**
* A plugin which shares a jar file with {@link
TestPlugin#MULTIPLE_PLUGINS_IN_JAR_THING_ONE}
*/
- MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar",
"test.plugins.ThingTwo");
+ MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar",
"test.plugins.ThingTwo"),
+ /**
+ * A plugin which is incorrectly packaged, and is missing a superclass
definition.
+ */
+ BAD_PACKAGING_MISSING_SUPERCLASS("bad-packaging",
"test.plugins.MissingSuperclass", false, REMOVE_CLASS_FILTER),
+ /**
+ * A plugin which is packaged with other incorrectly packaged plugins,
but itself has no issues loading.
+ */
+ BAD_PACKAGING_CO_LOCATED("bad-packaging",
"test.plugins.CoLocatedPlugin", true, REMOVE_CLASS_FILTER),
+ /**
+ * A connector which is incorrectly packaged, and throws during static
initialization.
+ */
+ BAD_PACKAGING_STATIC_INITIALIZER_THROWS_CONNECTOR("bad-packaging",
"test.plugins.StaticInitializerThrowsConnector", false, REMOVE_CLASS_FILTER),
+ /**
+ * A plugin which is incorrectly packaged, which throws an exception
from the {@link Versioned#version()} method.
+ */
+ BAD_PACKAGING_VERSION_METHOD_THROWS_CONNECTOR("bad-packaging",
"test.plugins.VersionMethodThrowsConnector", false, REMOVE_CLASS_FILTER),
+ /**
+ * A plugin which is incorrectly packaged, which throws an exception
from default constructor.
+ */
+ BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR("bad-packaging",
"test.plugins.DefaultConstructorThrowsConnector", false, REMOVE_CLASS_FILTER),
+ /**
+ * A plugin which is incorrectly packaged, which has a private default
constructor.
+ */
+ BAD_PACKAGING_DEFAULT_CONSTRUCTOR_PRIVATE_CONNECTOR("bad-packaging",
"test.plugins.DefaultConstructorPrivateConnector", false, REMOVE_CLASS_FILTER),
+ /**
+ * A plugin which is incorrectly packaged, which has a private default
constructor.
+ */
+ BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_CONNECTOR("bad-packaging",
"test.plugins.NoDefaultConstructorConnector", false, REMOVE_CLASS_FILTER),
+ /**
+ * A plugin which is incorrectly packaged, which throws an exception
from the {@link Versioned#version()} method.
+ */
+ BAD_PACKAGING_INNER_CLASS_CONNECTOR("bad-packaging",
"test.plugins.OuterClass$InnerClass", false, REMOVE_CLASS_FILTER),
+ /**
+ * A plugin which is incorrectly packaged, which throws an exception
from the {@link Versioned#version()} method.
+ */
+
BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION("bad-packaging",
"test.plugins.StaticInitializerThrowsRestExtension", false,
REMOVE_CLASS_FILTER);
private final String resourceDir;
private final String className;
private final boolean includeByDefault;
+ private final Predicate<String> removeRuntimeClasses;
TestPlugin(String resourceDir, String className) {
this(resourceDir, className, true);
}
TestPlugin(String resourceDir, String className, boolean
includeByDefault) {
+ this(resourceDir, className, includeByDefault, ignored -> false);
+ }
+
+ TestPlugin(String resourceDir, String className, boolean
includeByDefault, Predicate<String> removeRuntimeClasses) {
this.resourceDir = resourceDir;
this.className = className;
this.includeByDefault = includeByDefault;
+ this.removeRuntimeClasses = removeRuntimeClasses;
}
public String resourceDir() {
@@ -138,6 +184,10 @@ public class TestPlugins {
public boolean includeByDefault() {
return includeByDefault;
}
+
+ public Predicate<String> removeRuntimeClasses() {
+ return removeRuntimeClasses;
+ }
}
private static final Logger log =
LoggerFactory.getLogger(TestPlugins.class);
@@ -152,7 +202,7 @@ public class TestPlugins {
if (pluginJars.containsKey(testPlugin.resourceDir())) {
log.debug("Skipping recompilation of " +
testPlugin.resourceDir());
}
- pluginJars.put(testPlugin.resourceDir(),
createPluginJar(testPlugin.resourceDir()));
+ pluginJars.put(testPlugin.resourceDir(),
createPluginJar(testPlugin.resourceDir(), testPlugin.removeRuntimeClasses()));
}
} catch (Throwable e) {
log.error("Could not set up plugin test jars", e);
@@ -228,14 +278,14 @@ public class TestPlugins {
.toArray(TestPlugin[]::new);
}
- private static File createPluginJar(String resourceDir) throws IOException
{
+ private static File createPluginJar(String resourceDir, Predicate<String>
removeRuntimeClasses) throws IOException {
Path inputDir = resourceDirectoryPath("test-plugins/" + resourceDir);
Path binDir = Files.createTempDirectory(resourceDir + ".bin.");
compileJavaSources(inputDir, binDir);
File jarFile = Files.createTempFile(resourceDir + ".",
".jar").toFile();
try (JarOutputStream jar = openJarFile(jarFile)) {
- writeJar(jar, inputDir);
- writeJar(jar, binDir);
+ writeJar(jar, inputDir, removeRuntimeClasses);
+ writeJar(jar, binDir, removeRuntimeClasses);
}
removeDirectory(binDir);
jarFile.deleteOnExit();
@@ -315,10 +365,11 @@ public class TestPlugins {
}
}
- private static void writeJar(JarOutputStream jar, Path inputDir) throws
IOException {
+ private static void writeJar(JarOutputStream jar, Path inputDir,
Predicate<String> removeRuntimeClasses) throws IOException {
List<Path> paths = Files.walk(inputDir)
.filter(Files::isRegularFile)
.filter(path -> !path.toFile().getName().endsWith(".java"))
+ .filter(path ->
!removeRuntimeClasses.test(path.toFile().getName()))
.collect(Collectors.toList());
for (Path path : paths) {
try (InputStream in = new BufferedInputStream(new
FileInputStream(path.toFile()))) {
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
new file mode 100644
index 00000000000..8b0f4881486
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.StaticInitializerThrowsRestExtension
\ No newline at end of file
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/CoLocatedPlugin.java
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/CoLocatedPlugin.java
new file mode 100644
index 00000000000..c166e630e0a
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/CoLocatedPlugin.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Map;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>This is a plugin co-located with other poorly packaged plugins, but
should be visible despite other errors.
+ */
+public class CoLocatedPlugin implements Converter {
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+
+ }
+
+ @Override
+ public byte[] fromConnectData(final String topic, final Schema schema,
final Object value) {
+ return new byte[0];
+ }
+
+ @Override
+ public SchemaAndValue toConnectData(final String topic, final byte[]
value) {
+ return null;
+ }
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorPrivateConnector.java
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorPrivateConnector.java
new file mode 100644
index 00000000000..107c4af8214
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorPrivateConnector.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Mark the default constructor as private.
+ */
+public class DefaultConstructorPrivateConnector extends SinkConnector {
+
+
+ private DefaultConstructorPrivateConnector() {
+ }
+
+ @Override
+ public String version() {
+ return null;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public ConfigDef config() {
+ return null;
+ }
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConnector.java
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConnector.java
new file mode 100644
index 00000000000..77e194f9805
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConnector.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Unconditionally throw an exception during the default constructor.
+ */
+public class DefaultConstructorThrowsConnector extends SinkConnector {
+
+
+ public DefaultConstructorThrowsConnector() {
+ throw new RuntimeException("I always throw an exception");
+ }
+
+ @Override
+ public String version() {
+ return null;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public ConfigDef config() {
+ return null;
+ }
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/MissingSuperclassConverter.java
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/MissingSuperclassConverter.java
new file mode 100644
index 00000000000..a7fb10f3897
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/MissingSuperclassConverter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Map;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Implement a non-existent interface that is not packaged at runtime.
+ */
+public class MissingSuperclassConverter implements Converter,
NonExistentInterface {
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+
+ }
+
+ @Override
+ public byte[] fromConnectData(final String topic, final Schema schema,
final Object value) {
+ return new byte[0];
+ }
+
+ @Override
+ public SchemaAndValue toConnectData(final String topic, final byte[]
value) {
+ return null;
+ }
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConnector.java
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConnector.java
new file mode 100644
index 00000000000..fe6b2da2a0c
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConnector.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>This class has no default constructor
+ */
+public class NoDefaultConstructorConnector extends SinkConnector {
+
+
+ public NoDefaultConstructorConnector(int ignored) {
+ }
+
+ @Override
+ public String version() {
+ return null;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public ConfigDef config() {
+ return null;
+ }
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NonExistentInterface.java
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NonExistentInterface.java
new file mode 100644
index 00000000000..6ec9de969d6
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NonExistentInterface.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+/**
+ * An interface which is present for compilation but removed during packaging.
+ */
+public interface NonExistentInterface {
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/OuterClass.java
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/OuterClass.java
new file mode 100644
index 00000000000..e76d3a7de79
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/OuterClass.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Defines a connector as a non-static inner class, which does not have a
default constructor.
+ */
+public class OuterClass {
+
+ public class InnerClass extends SinkConnector {
+ @Override
+ public String version() {
+ throw new RuntimeException("I always throw an exception");
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public ConfigDef config() {
+ return null;
+ }
+ }
+
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsConnector.java
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsConnector.java
new file mode 100644
index 00000000000..945b78fbb95
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsConnector.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Unconditionally throw an exception during static initialization.
+ */
+public class StaticInitializerThrowsConnector extends SinkConnector {
+
+ static {
+ setup();
+ }
+
+ public static void setup() {
+ throw new RuntimeException("I always throw an exception");
+ }
+
+ @Override
+ public String version() {
+ return null;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public ConfigDef config() {
+ return null;
+ }
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsRestExtension.java
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsRestExtension.java
new file mode 100644
index 00000000000..8ef0bdc10e9
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsRestExtension.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Unconditionally throw an exception during static initialization.
+ */
+public class StaticInitializerThrowsRestExtension implements
ConnectRestExtension {
+
+ static {
+ setup();
+ }
+
+ public static void setup() {
+ throw new RuntimeException("I always throw an exception");
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+
+ }
+
+ @Override
+ public String version() {
+ return null;
+ }
+
+ @Override
+ public void register(ConnectRestExtensionContext restPluginContext) {
+
+ }
+}
diff --git
a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/VersionMethodThrowsConnector.java
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/VersionMethodThrowsConnector.java
new file mode 100644
index 00000000000..b178697c901
--- /dev/null
+++
b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/VersionMethodThrowsConnector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * <p>Unconditionally throw an exception during the version method.
+ */
+public class VersionMethodThrowsConnector extends SinkConnector {
+
+ @Override
+ public String version() {
+ throw new RuntimeException("I always throw an exception");
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public ConfigDef config() {
+ return null;
+ }
+}