This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new ad5f31c  KAFKA-9057: Backport KAFKA-8819 and KAFKA-8340 before 2.0 
(#7549)
ad5f31c is described below

commit ad5f31c3e6868757eab9c8dd71efd5fd65f06ac6
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Tue Oct 22 11:54:13 2019 -0700

    KAFKA-9057: Backport KAFKA-8819 and KAFKA-8340 before 2.0 (#7549)
    
    Includes fixes from PR-7315 (KAFKA-8819 and KAFKA-8340), but omits 
ConfigProvider and Configurable test cases and plugins, and replaces Java 8 
language features with suitable Java 7 features.
    
    Signed-off-by: Greg Harris <gr...@confluent.io>
    Author: Greg Harris <gr...@confluent.io>
    Reviewer: Randall Hauch <rha...@gmail.com>
---
 checkstyle/import-control.xml                      |   1 +
 .../org/apache/kafka/connect/runtime/Worker.java   |   4 +-
 .../runtime/isolation/DelegatingClassLoader.java   |  39 +--
 .../kafka/connect/runtime/isolation/Plugins.java   |  82 +++---
 .../connect/runtime/isolation/PluginsTest.java     | 180 ++++++++++++-
 .../runtime/isolation/SamplingTestPlugin.java      | 122 +++++++++
 .../connect/runtime/isolation/TestPlugins.java     | 277 +++++++++++++++++++++
 .../test/plugins/AliasedStaticField.java           |  75 ++++++
 .../test/plugins/AlwaysThrowException.java         |  53 ++++
 .../test/plugins/SamplingConverter.java            |  76 ++++++
 .../test/plugins/SamplingHeaderConverter.java      |  89 +++++++
 .../services/test.plugins.ServiceLoadedClass       |  16 ++
 .../test/plugins/ServiceLoadedClass.java           |  48 ++++
 .../test/plugins/ServiceLoadedSubclass.java        |  46 ++++
 .../test/plugins/ServiceLoaderPlugin.java          |  85 +++++++
 15 files changed, 1130 insertions(+), 63 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 000acc3..5a9316d 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -313,6 +313,7 @@
       <subpackage name="isolation">
         <allow pkg="com.fasterxml.jackson" />
         <allow pkg="org.apache.maven.artifact.versioning" />
+        <allow pkg="javax.tools" />
       </subpackage>
     </subpackage>
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1c64658..545797a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -367,10 +367,10 @@ public class Worker {
         final WorkerTask workerTask;
         ClassLoader savedLoader = plugins.currentThreadLoader();
         try {
-            final ConnectorConfig connConfig = new ConnectorConfig(plugins, 
connProps);
-            String connType = 
connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+            String connType = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
             ClassLoader connectorLoader = 
plugins.delegatingLoader().connectorLoader(connType);
             savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+            final ConnectorConfig connConfig = new ConnectorConfig(plugins, 
connProps);
             final TaskConfig taskConfig = new TaskConfig(taskProps);
             final Class<? extends Task> taskClass = 
taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
             final Task task = plugins.newTask(taskClass);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 83acbaa..7cfd4d5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -107,13 +107,32 @@ public class DelegatingClassLoader extends URLClassLoader 
{
         return connectorLoader(connector.getClass().getName());
     }
 
+    /**
+     * Retrieve the PluginClassLoader associated with a plugin class
+     * @param name The fully qualified class name of the plugin
+     * @return the PluginClassLoader that should be used to load this, or null 
if the plugin is not isolated.
+     */
+    public PluginClassLoader pluginClassLoader(String name) {
+        if (!PluginUtils.shouldLoadInIsolation(name)) {
+            return null;
+        }
+        SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
+        if (inner == null) {
+            return null;
+        }
+        ClassLoader pluginLoader = inner.get(inner.lastKey());
+        return pluginLoader instanceof PluginClassLoader
+               ? (PluginClassLoader) pluginLoader
+               : null;
+    }
+
     public ClassLoader connectorLoader(String connectorClassOrAlias) {
         log.debug("Getting plugin class loader for connector: '{}'", 
connectorClassOrAlias);
         String fullName = aliases.containsKey(connectorClassOrAlias)
                           ? aliases.get(connectorClassOrAlias)
                           : connectorClassOrAlias;
-        SortedMap<PluginDesc<?>, ClassLoader> inner = 
pluginLoaders.get(fullName);
-        if (inner == null) {
+        PluginClassLoader classLoader = pluginClassLoader(fullName);
+        if (classLoader == null) {
             log.error(
                     "Plugin class loader for connector: '{}' was not found. 
Returning: {}",
                     connectorClassOrAlias,
@@ -121,7 +140,7 @@ public class DelegatingClassLoader extends URLClassLoader {
             );
             return this;
         }
-        return inner.get(inner.lastKey());
+        return classLoader;
     }
 
     private static PluginClassLoader newPluginClassLoader(
@@ -317,19 +336,11 @@ public class DelegatingClassLoader extends URLClassLoader 
{
 
     @Override
     protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
-        if (!PluginUtils.shouldLoadInIsolation(name)) {
-            // There are no paths in this classloader, will attempt to load 
with the parent.
-            return super.loadClass(name, resolve);
-        }
-
         String fullName = aliases.containsKey(name) ? aliases.get(name) : name;
-        SortedMap<PluginDesc<?>, ClassLoader> inner = 
pluginLoaders.get(fullName);
-        if (inner != null) {
-            ClassLoader pluginLoader = inner.get(inner.lastKey());
+        PluginClassLoader pluginLoader = pluginClassLoader(fullName);
+        if (pluginLoader != null) {
             log.trace("Retrieving loaded class '{}' from '{}'", fullName, 
pluginLoader);
-            return pluginLoader instanceof PluginClassLoader
-                   ? ((PluginClassLoader) pluginLoader).loadClass(fullName, 
resolve)
-                   : super.loadClass(fullName, resolve);
+            return pluginLoader.loadClass(fullName, resolve);
         }
 
         return super.loadClass(fullName, resolve);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 017d009..71de650 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectRecord;
@@ -72,14 +71,38 @@ public class Plugins {
     }
 
     protected static <T> T newPlugin(Class<T> klass) {
+        // KAFKA-8340: The thread classloader is used during static 
initialization and must be
+        // set to the plugin's classloader during instantiation
+        ClassLoader savedLoader = 
compareAndSwapLoaders(klass.getClassLoader());
         try {
             return Utils.newInstance(klass);
         } catch (Throwable t) {
             throw new ConnectException("Instantiation error", t);
+        } finally {
+            compareAndSwapLoaders(savedLoader);
         }
     }
 
     @SuppressWarnings("unchecked")
+    protected <U> Class<? extends U> pluginClassFromConfig(
+            AbstractConfig config,
+            String propertyName,
+            Class<U> pluginClass,
+            Collection<PluginDesc<U>> plugins
+    ) {
+        Class<?> klass = config.getClass(propertyName);
+        if (pluginClass.isAssignableFrom(klass)) {
+            return (Class<? extends U>) klass;
+        }
+        throw new ConnectException(
+            "Failed to find any class that implements " + 
pluginClass.getSimpleName()
+                + " for the config "
+                + propertyName + ", available classes are: "
+                + pluginNames(plugins)
+        );
+    }
+
+    @SuppressWarnings("unchecked")
     protected static <U> Class<? extends U> pluginClass(
             DelegatingClassLoader loader,
             String classOrAlias,
@@ -199,18 +222,17 @@ public class Plugins {
             // This configuration does not define the converter via the 
specified property name
             return null;
         }
-        Converter plugin = null;
+        Class<? extends Converter> klass = null;
         switch (classLoaderUsage) {
             case CURRENT_CLASSLOADER:
                 // Attempt to load first with the current classloader, and 
plugins as a fallback.
                 // Note: we can't use config.getConfiguredInstance because 
Converter doesn't implement Configurable, and even if it did
                 // we have to remove the property prefixes before calling 
config(...) and we still always want to call Converter.config.
-                plugin = getInstance(config, classPropertyName, 
Converter.class);
+                klass = pluginClassFromConfig(config, classPropertyName, 
Converter.class, delegatingLoader.converters());
                 break;
             case PLUGINS:
                 // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback
                 String converterClassOrAlias = 
config.getClass(classPropertyName).getName();
-                Class<? extends Converter> klass;
                 try {
                     klass = pluginClass(delegatingLoader, 
converterClassOrAlias, Converter.class);
                 } catch (ClassNotFoundException e) {
@@ -220,11 +242,10 @@ public class Plugins {
                             + pluginNames(delegatingLoader.converters())
                     );
                 }
-                plugin = newPlugin(klass);
                 break;
         }
-        if (plugin == null) {
-            throw new ConnectException("Unable to instantiate the Converter 
specified in '" + classPropertyName + "'");
+        if (klass == null) {
+            throw new ConnectException("Unable to initialize the Converter 
specified in '" + classPropertyName + "'");
         }
 
         // Determine whether this is a key or value converter based upon the 
supplied property name ...
@@ -236,7 +257,14 @@ public class Plugins {
         Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
         log.debug("Configuring the {} converter with configuration keys:{}{}",
                   isKeyConverter ? "key" : "value", System.lineSeparator(), 
converterConfig.keySet());
-        plugin.configure(converterConfig, isKeyConverter);
+        Converter plugin;
+        ClassLoader savedLoader = 
compareAndSwapLoaders(klass.getClassLoader());
+        try {
+            plugin = newPlugin(klass);
+            plugin.configure(converterConfig, isKeyConverter);
+        } finally {
+            compareAndSwapLoaders(savedLoader);
+        }
         return plugin;
     }
 
@@ -251,7 +279,7 @@ public class Plugins {
      * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
      */
     public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
-        HeaderConverter plugin = null;
+        Class<? extends HeaderConverter> klass = null;
         switch (classLoaderUsage) {
             case CURRENT_CLASSLOADER:
                 if (!config.originals().containsKey(classPropertyName)) {
@@ -261,13 +289,12 @@ public class Plugins {
                 // Attempt to load first with the current classloader, and 
plugins as a fallback.
                 // Note: we can't use config.getConfiguredInstance because we 
have to remove the property prefixes
                 // before calling config(...)
-                plugin = getInstance(config, classPropertyName, 
HeaderConverter.class);
+                klass = pluginClassFromConfig(config, classPropertyName, 
HeaderConverter.class, delegatingLoader.headerConverters());
                 break;
             case PLUGINS:
                 // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback.
                 // Note that there will always be at least a default header 
converter for the worker
                 String converterClassOrAlias = 
config.getClass(classPropertyName).getName();
-                Class<? extends HeaderConverter> klass;
                 try {
                     klass = pluginClass(
                             delegatingLoader,
@@ -282,38 +309,25 @@ public class Plugins {
                                     + 
pluginNames(delegatingLoader.headerConverters())
                     );
                 }
-                plugin = newPlugin(klass);
         }
-        if (plugin == null) {
-            throw new ConnectException("Unable to instantiate the Converter 
specified in '" + classPropertyName + "'");
+        if (klass == null) {
+            throw new ConnectException("Unable to initialize the 
HeaderConverter specified in '" + classPropertyName + "'");
         }
 
         String configPrefix = classPropertyName + ".";
         Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
         converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
         log.debug("Configuring the header converter with configuration 
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
-        plugin.configure(converterConfig);
-        return plugin;
-    }
 
-    /**
-     * Get an instance of the give class specified by the given configuration 
key.
-     *
-     * @param key The configuration key for the class
-     * @param t The interface the class should implement
-     * @return A instance of the class
-     */
-    private <T> T getInstance(AbstractConfig config, String key, Class<T> t) {
-        Class<?> c = config.getClass(key);
-        if (c == null) {
-            return null;
-        }
-        // Instantiate the class, but we don't know if the class extends the 
supplied type
-        Object o = Utils.newInstance(c);
-        if (!t.isInstance(o)) {
-            throw new KafkaException(c.getName() + " is not an instance of " + 
t.getName());
+        HeaderConverter plugin;
+        ClassLoader savedLoader = 
compareAndSwapLoaders(klass.getClassLoader());
+        try {
+            plugin = newPlugin(klass);
+            plugin.configure(converterConfig);
+        } finally {
+            compareAndSwapLoaders(savedLoader);
         }
-        return t.cast(o);
+        return plugin;
     }
 
     public <R extends ConnectRecord<R>> Transformation<R> newTranformations(
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index a9a944f..e4df6c2 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -17,9 +17,12 @@
 
 package org.apache.kafka.connect.runtime.isolation;
 
+import java.util.Map.Entry;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.json.JsonConverterConfig;
@@ -31,7 +34,6 @@ import org.apache.kafka.connect.storage.ConverterType;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.SimpleHeaderConverter;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -41,29 +43,24 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 public class PluginsTest {
 
-    private static Map<String, String> pluginProps;
-    private static Plugins plugins;
+    private Plugins plugins;
     private Map<String, String> props;
     private AbstractConfig config;
     private TestConverter converter;
     private TestHeaderConverter headerConverter;
 
-    @BeforeClass
-    public static void beforeAll() {
-        pluginProps = new HashMap<>();
-
-        // Set up the plugins to have no additional plugin directories.
-        // This won't allow us to test classpath isolation, but it will allow 
us to test some of the utility methods.
-        pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, "");
-        plugins = new Plugins(pluginProps);
-    }
-
     @Before
     public void setup() {
+        Map<String, String> pluginProps = new HashMap<>();
+
+        // Set up the plugins with some test plugins to test isolation
+        pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, 
Utils.join(TestPlugins.pluginPath(), ","));
+        plugins = new Plugins(pluginProps);
         props = new HashMap<>(pluginProps);
         props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
TestConverter.class.getName());
         props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
TestConverter.class.getName());
@@ -158,6 +155,163 @@ public class PluginsTest {
         assertTrue(headerConverter instanceof SimpleHeaderConverter);
     }
 
+    @Test(expected = ExceptionInInitializerError.class)
+    public void shouldThrowIfPluginThrows() {
+        TestPlugins.assertAvailable();
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
TestPlugins.ALWAYS_THROW_EXCEPTION);
+        ClassLoader classLoader = 
plugins.delegatingLoader().pluginClassLoader(TestPlugins.ALWAYS_THROW_EXCEPTION);
+        ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader);
+        try {
+            createConfig();
+        } finally {
+            Plugins.compareAndSwapLoaders(savedLoader);
+        }
+    }
+
+    @Test
+    public void shouldShareStaticValuesBetweenSamePlugin() {
+        // Plugins are not isolated from other instances of their own class.
+        TestPlugins.assertAvailable();
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
TestPlugins.ALIASED_STATIC_FIELD);
+        ClassLoader classLoader = 
plugins.delegatingLoader().pluginClassLoader(TestPlugins.ALIASED_STATIC_FIELD);
+        ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader);
+        createConfig();
+        Plugins.compareAndSwapLoaders(savedLoader);
+
+        Converter firstPlugin = plugins.newConverter(
+            config,
+            WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+            ClassLoaderUsage.PLUGINS
+        );
+
+        assertInstanceOf(SamplingTestPlugin.class, firstPlugin, "Cannot 
collect samples");
+
+        Converter secondPlugin = plugins.newConverter(
+            config,
+            WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+            ClassLoaderUsage.PLUGINS
+        );
+
+        assertInstanceOf(SamplingTestPlugin.class, secondPlugin, "Cannot 
collect samples");
+        assertSame(
+            ((SamplingTestPlugin) firstPlugin).otherSamples(),
+            ((SamplingTestPlugin) secondPlugin).otherSamples()
+        );
+    }
+
+    @Test
+    public void newPluginShouldServiceLoadWithPluginClassLoader() {
+        TestPlugins.assertAvailable();
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
TestPlugins.SERVICE_LOADER);
+        ClassLoader classLoader = 
plugins.delegatingLoader().pluginClassLoader(TestPlugins.SERVICE_LOADER);
+        ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader);
+        createConfig();
+        Plugins.compareAndSwapLoaders(savedLoader);
+        Converter plugin = plugins.newConverter(
+            config,
+            WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+            ClassLoaderUsage.PLUGINS
+        );
+
+        assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect 
samples");
+        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) 
plugin).flatten();
+        // Assert that the service loaded subclass is found in both 
environments
+        assertTrue(samples.containsKey("ServiceLoadedSubclass.static"));
+        assertTrue(samples.containsKey("ServiceLoadedSubclass.dynamic"));
+        assertPluginClassLoaderAlwaysActive(samples);
+    }
+
+    @Test
+    public void newPluginShouldInstantiateWithPluginClassLoader() {
+        TestPlugins.assertAvailable();
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
TestPlugins.SERVICE_LOADER);
+        ClassLoader classLoader = 
plugins.delegatingLoader().pluginClassLoader(TestPlugins.SERVICE_LOADER);
+        ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader);
+        createConfig();
+        Plugins.compareAndSwapLoaders(savedLoader);
+        Converter plugin = plugins.newConverter(
+            config,
+            WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+            ClassLoaderUsage.PLUGINS
+        );
+
+        assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect 
samples");
+        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) 
plugin).flatten();
+        assertPluginClassLoaderAlwaysActive(samples);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void shouldFailToFindConverterInCurrentClassloader() {
+        TestPlugins.assertAvailable();
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
TestPlugins.SAMPLING_CONVERTER);
+        createConfig();
+    }
+
+    @Test
+    public void newConverterShouldConfigureWithPluginClassLoader() {
+        TestPlugins.assertAvailable();
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
TestPlugins.SAMPLING_CONVERTER);
+        ClassLoader classLoader = 
plugins.delegatingLoader().pluginClassLoader(TestPlugins.SAMPLING_CONVERTER);
+        ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader);
+        createConfig();
+        Plugins.compareAndSwapLoaders(savedLoader);
+
+        Converter plugin = plugins.newConverter(
+            config,
+            WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+            ClassLoaderUsage.PLUGINS
+        );
+
+        assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect 
samples");
+        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) 
plugin).flatten();
+        assertTrue(samples.containsKey("configure"));
+        assertPluginClassLoaderAlwaysActive(samples);
+    }
+
+    @Test
+    public void newHeaderConverterShouldConfigureWithPluginClassLoader() {
+        TestPlugins.assertAvailable();
+        props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, 
TestPlugins.SAMPLING_HEADER_CONVERTER);
+        ClassLoader classLoader = 
plugins.delegatingLoader().pluginClassLoader(TestPlugins.SAMPLING_HEADER_CONVERTER);
+        ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader);
+        createConfig();
+        Plugins.compareAndSwapLoaders(savedLoader);
+
+        HeaderConverter plugin = plugins.newHeaderConverter(
+            config,
+            WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+            ClassLoaderUsage.PLUGINS
+        );
+
+        assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect 
samples");
+        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) 
plugin).flatten();
+        assertTrue(samples.containsKey("configure")); // 
HeaderConverter::configure was called
+        assertPluginClassLoaderAlwaysActive(samples);
+    }
+
+    public static void assertPluginClassLoaderAlwaysActive(Map<String, 
SamplingTestPlugin> samples) {
+        for (Entry<String, SamplingTestPlugin> e : samples.entrySet()) {
+            String sampleName = "\"" + e.getKey() + "\" (" + e.getValue() + 
")";
+            assertInstanceOf(
+                PluginClassLoader.class,
+                e.getValue().staticClassloader(),
+                sampleName + " has incorrect static classloader"
+            );
+            assertInstanceOf(
+                PluginClassLoader.class,
+                e.getValue().classloader(),
+                sampleName + " has incorrect dynamic classloader"
+            );
+        }
+    }
+
+    public static void assertInstanceOf(Class<?> expected, Object actual, 
String message) {
+        assertTrue(
+            "Expected an instance of " + expected.getSimpleName() + ", found " 
+ actual + " instead: " + message,
+            expected.isInstance(actual)
+        );
+    }
+
     protected void instantiateAndConfigureConverter(String configPropName, 
ClassLoaderUsage classLoaderUsage) {
         converter = (TestConverter) plugins.newConverter(config, 
configPropName, classLoaderUsage);
         assertNotNull(converter);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java
new file mode 100644
index 0000000..bcf8881
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Base class for plugins so we can sample information about their 
initialization
+ */
+public abstract class SamplingTestPlugin {
+
+    /**
+     * @return the ClassLoader used to statically initialize this plugin class
+     */
+    public abstract ClassLoader staticClassloader();
+
+    /**
+     * @return the ClassLoader used to initialize this plugin instance
+     */
+    public abstract ClassLoader classloader();
+
+    /**
+     * @return a group of other SamplingTestPlugin instances known by this 
plugin
+     * This should only return direct children, and not reference this 
instance directly
+     */
+    public Map<String, SamplingTestPlugin> otherSamples() {
+        return Collections.emptyMap();
+    }
+
+    /**
+     * @return a flattened list of child samples including this entry keyed as 
"this"
+     */
+    public Map<String, SamplingTestPlugin> flatten() {
+        Map<String, SamplingTestPlugin> out = new HashMap<>();
+        Map<String, SamplingTestPlugin> otherSamples = otherSamples();
+        if (otherSamples != null) {
+            for (Entry<String, SamplingTestPlugin> child : 
otherSamples.entrySet()) {
+                for (Entry<String, SamplingTestPlugin> flattened : 
child.getValue().flatten().entrySet()) {
+                    String key = child.getKey();
+                    if (flattened.getKey().length() > 0) {
+                        key += "." + flattened.getKey();
+                    }
+                    out.put(key, flattened.getValue());
+                }
+            }
+        }
+        out.put("", this);
+        return out;
+    }
+
+    /**
+     * Log the parent method call as a child sample.
+     * Stores only the last invocation of each method if there are multiple 
invocations.
+     * @param samples The collection of samples to which this method call 
should be added
+     */
+    public void logMethodCall(Map<String, SamplingTestPlugin> samples) {
+        StackTraceElement[] stackTraces = 
Thread.currentThread().getStackTrace();
+        if (stackTraces.length < 2) {
+            return;
+        }
+        // 0 is inside getStackTrace
+        // 1 is this method
+        // 2 is our caller method
+        StackTraceElement caller = stackTraces[2];
+
+        samples.put(caller.getMethodName(), new MethodCallSample(
+            caller,
+            Thread.currentThread().getContextClassLoader(),
+            getClass().getClassLoader()
+        ));
+    }
+
+    public static class MethodCallSample extends SamplingTestPlugin {
+
+        private final StackTraceElement caller;
+        private final ClassLoader staticClassLoader;
+        private final ClassLoader dynamicClassLoader;
+
+        public MethodCallSample(
+            StackTraceElement caller,
+            ClassLoader staticClassLoader,
+            ClassLoader dynamicClassLoader
+        ) {
+            this.caller = caller;
+            this.staticClassLoader = staticClassLoader;
+            this.dynamicClassLoader = dynamicClassLoader;
+        }
+
+        @Override
+        public ClassLoader staticClassloader() {
+            return staticClassLoader;
+        }
+
+        @Override
+        public ClassLoader classloader() {
+            return dynamicClassLoader;
+        }
+
+        @Override
+        public String toString() {
+            return caller.toString();
+        }
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
new file mode 100644
index 0000000..8a08c07
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.net.URL;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.Attributes;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+import javax.tools.JavaCompiler;
+import javax.tools.StandardJavaFileManager;
+import javax.tools.ToolProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for constructing test plugins for Connect.
+ *
+ * <p>Plugins are built from their source under resources/test-plugins/ and 
placed into temporary
+ * jar files that are deleted when the process exits.
+ *
+ * <p>To add a plugin, create the source files in the resource tree, and edit 
this class to build
+ * that plugin during initialization. For example, the plugin class {@literal 
package.Class} should
+ * be placed in {@literal resources/test-plugins/something/package/Class.java} 
and loaded using
+ * {@code createPluginJar("something")}. The class name, contents, and plugin 
directory can take
+ * any value you need for testing.
+ *
+ * <p>To use this class in your tests, make sure to first call
+ * {@link TestPlugins#assertAvailable()} to verify that the plugins 
initialized correctly.
+ * Otherwise, exceptions during the plugin build are not propagated, and may 
invalidate your test.
+ * You can access the list of plugin jars for assembling a {@literal 
plugin.path}, and reference
+ * the names of the different plugins directly via the exposed constants.
+ */
+public class TestPlugins {
+
+    /**
+     * Class name of a plugin which will always throw an exception during 
loading
+     */
+    public static final String ALWAYS_THROW_EXCEPTION = 
"test.plugins.AlwaysThrowException";
+    /**
+     * Class name of a plugin which samples information about its 
initialization.
+     */
+    public static final String ALIASED_STATIC_FIELD = 
"test.plugins.AliasedStaticField";
+    /**
+     * Class name of a {@link org.apache.kafka.connect.storage.Converter}
+     * which samples information about its method calls.
+     */
+    public static final String SAMPLING_CONVERTER = 
"test.plugins.SamplingConverter";
+    /**
+     * Class name of a {@link org.apache.kafka.connect.storage.HeaderConverter}
+     * which samples information about its method calls.
+     */
+    public static final String SAMPLING_HEADER_CONVERTER = 
"test.plugins.SamplingHeaderConverter";
+    /**
+     * Class name of a plugin which uses a {@link java.util.ServiceLoader}
+     * to load internal classes, and samples information about their 
initialization.
+     */
+    public static final String SERVICE_LOADER = 
"test.plugins.ServiceLoaderPlugin";
+
+    private static final Logger log = 
LoggerFactory.getLogger(TestPlugins.class);
+    private static final Map<String, File> PLUGIN_JARS;
+    private static final Throwable INITIALIZATION_EXCEPTION;
+
+    static {
+        Throwable err = null;
+        HashMap<String, File> pluginJars = new HashMap<>();
+        try {
+            pluginJars.put(ALWAYS_THROW_EXCEPTION, 
createPluginJar("always-throw-exception"));
+            pluginJars.put(ALIASED_STATIC_FIELD, 
createPluginJar("aliased-static-field"));
+            pluginJars.put(SAMPLING_CONVERTER, 
createPluginJar("sampling-converter"));
+            pluginJars.put(SAMPLING_HEADER_CONVERTER, 
createPluginJar("sampling-header-converter"));
+            pluginJars.put(SERVICE_LOADER, createPluginJar("service-loader"));
+        } catch (Throwable e) {
+            log.error("Could not set up plugin test jars", e);
+            err = e;
+        }
+        PLUGIN_JARS = Collections.unmodifiableMap(pluginJars);
+        INITIALIZATION_EXCEPTION = err;
+    }
+
+    /**
+     * Ensure that the test plugin JARs were assembled without error before 
continuing.
+     * @throws AssertionError if any plugin failed to load, or no plugins were 
loaded.
+     */
+    public static void assertAvailable() throws AssertionError {
+        if (INITIALIZATION_EXCEPTION != null) {
+            throw new AssertionError("TestPlugins did not initialize 
completely",
+                INITIALIZATION_EXCEPTION);
+        }
+        if (PLUGIN_JARS.isEmpty()) {
+            throw new AssertionError("No test plugins loaded");
+        }
+    }
+
+    /**
+     * A list of jar files containing test plugins
+     * @return A list of plugin jar filenames
+     */
+    public static List<String> pluginPath() {
+        List<String> out = new ArrayList<>();
+        for (File f : PLUGIN_JARS.values()) {
+            out.add(f.getPath());
+        }
+        return out;
+    }
+
+    /**
+     * Get all of the classes that were successfully built by this class
+     * @return A list of plugin class names
+     */
+    public static List<String> pluginClasses() {
+        return new ArrayList<>(PLUGIN_JARS.keySet());
+    }
+
+    private static File createPluginJar(String resourceDir) throws IOException 
{
+        Path inputDir = resourceDirectoryPath("test-plugins/" + resourceDir);
+        Path binDir = Files.createTempDirectory(resourceDir + ".bin.");
+        compileJavaSources(inputDir, binDir);
+        File jarFile = Files.createTempFile(resourceDir + ".", 
".jar").toFile();
+        try (JarOutputStream jar = openJarFile(jarFile)) {
+            writeJar(jar, inputDir);
+            writeJar(jar, binDir);
+        }
+        removeDirectory(binDir);
+        jarFile.deleteOnExit();
+        return jarFile;
+    }
+
+    private static Path resourceDirectoryPath(String resourceDir) throws 
IOException {
+        URL resource = Thread.currentThread()
+            .getContextClassLoader()
+            .getResource(resourceDir);
+        if (resource == null) {
+            throw new IOException("Could not find test plugin resource: " + 
resourceDir);
+        }
+        File file = new File(resource.getFile());
+        if (!file.isDirectory()) {
+            throw new IOException("Resource is not a directory: " + 
resourceDir);
+        }
+        if (!file.canRead()) {
+            throw new IOException("Resource directory is not readable: " + 
resourceDir);
+        }
+        return file.toPath();
+    }
+
+    private static JarOutputStream openJarFile(File jarFile) throws 
IOException {
+        Manifest manifest = new Manifest();
+        manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, 
"1.0");
+        return new JarOutputStream(new FileOutputStream(jarFile), manifest);
+    }
+
+    private static void removeDirectory(Path binDir) throws IOException {
+        Files.walkFileTree(binDir, new SimpleFileVisitor<Path>() {
+            @Override
+            public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) {
+                if (!file.toFile().delete()) {
+                    log.info("Could not delete " + file);
+                }
+                return FileVisitResult.CONTINUE;
+            }
+
+            @Override
+            public FileVisitResult postVisitDirectory(Path dir, IOException 
exc) {
+                if (!dir.toFile().delete()) {
+                    log.info("Could not delete " + dir);
+                }
+                return FileVisitResult.CONTINUE;
+            }
+        });
+    }
+
+    /**
+     * Compile a directory of .java source files into .class files
+     * .class files are placed into the same directory as their sources.
+     *
+     * <p>Dependencies between source files in this directory are resolved 
against one another
+     * and the classes present in the test environment.
+     * See https://stackoverflow.com/questions/1563909/ for more information.
+     * Additional dependencies in your plugins should be added as test scope 
to :connect:runtime.
+     * @param sourceDir Directory containing java source files
+     * @throws IOException if the files cannot be compiled
+     */
+    private static void compileJavaSources(Path sourceDir, Path binDir) throws 
IOException {
+        JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+        final List<File> sourceFiles = new ArrayList<>();
+        Files.walkFileTree(sourceDir, new SimpleFileVisitor<Path>() {
+            @Override
+            public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) {
+                if (file.toFile().getName().endsWith(".java")) {
+                    sourceFiles.add(file.toFile());
+                }
+                return FileVisitResult.CONTINUE;
+            }
+        });
+
+        StringWriter writer = new StringWriter();
+        List<String> options = Arrays.asList(
+            "-d", binDir.toString() // Write class output to a different 
directory.
+        );
+
+        try (StandardJavaFileManager fileManager = 
compiler.getStandardFileManager(null, null, null)) {
+            boolean success = compiler.getTask(
+                writer,
+                fileManager,
+                null,
+                options,
+                null,
+                fileManager.getJavaFileObjectsFromFiles(sourceFiles)
+            ).call();
+            if (!success) {
+                throw new RuntimeException("Failed to compile test plugin:\n" 
+ writer);
+            }
+        }
+    }
+
+    private static void writeJar(JarOutputStream jar, Path inputDir) throws 
IOException {
+        final List<Path> paths = new ArrayList<>();
+        Files.walkFileTree(inputDir, new SimpleFileVisitor<Path>() {
+            @Override
+            public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) {
+                if (!file.toFile().getName().endsWith(".java")) {
+                    paths.add(file);
+                }
+                return FileVisitResult.CONTINUE;
+            }
+        });
+        for (Path path : paths) {
+            try (InputStream in = new BufferedInputStream(new 
FileInputStream(path.toFile()))) {
+                jar.putNextEntry(new JarEntry(
+                    inputDir.relativize(path)
+                        .toFile()
+                        .getPath()
+                        .replace(File.separator, "/")
+                ));
+                byte[] buffer = new byte[1024];
+                for (int count; (count = in.read(buffer)) != -1; ) {
+                    jar.write(buffer, 0, count);
+                }
+                jar.closeEntry();
+            }
+        }
+    }
+
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java
 
b/connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java
new file mode 100644
index 0000000..d865f4e
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
+
+/**
+ * Samples data about its initialization environment for later analysis
+ * Samples are shared between instances of the same class in a static variable
+ */
+public class AliasedStaticField extends SamplingTestPlugin implements 
Converter {
+
+  private static final Map<String, SamplingTestPlugin> SAMPLES;
+  private static final ClassLoader STATIC_CLASS_LOADER;
+  private final ClassLoader classloader;
+
+  static {
+    SAMPLES = new HashMap<>();
+    STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
+  }
+
+  {
+    classloader = Thread.currentThread().getContextClassLoader();
+  }
+
+  @Override
+  public void configure(final Map<String, ?> configs, final boolean isKey) {
+
+  }
+
+  @Override
+  public byte[] fromConnectData(final String topic, final Schema schema, final 
Object value) {
+    return new byte[0];
+  }
+
+  @Override
+  public SchemaAndValue toConnectData(final String topic, final byte[] value) {
+    return null;
+  }
+
+  @Override
+  public ClassLoader staticClassloader() {
+    return STATIC_CLASS_LOADER;
+  }
+
+  @Override
+  public ClassLoader classloader() {
+    return classloader;
+  }
+
+  @Override
+  public Map<String, SamplingTestPlugin> otherSamples() {
+      return SAMPLES;
+  }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java
 
b/connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java
new file mode 100644
index 0000000..858f3ed
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Map;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
+import org.apache.kafka.connect.storage.Converter;
+
+/**
+ * Unconditionally throw an exception during static initialization.
+ */
+public class AlwaysThrowException implements Converter {
+
+    static {
+        setup();
+    }
+
+    public static void setup() {
+        throw new RuntimeException("I always throw an exception");
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+
+    }
+
+    @Override
+    public byte[] fromConnectData(final String topic, final Schema schema, 
final Object value) {
+        return new byte[0];
+    }
+
+    @Override
+    public SchemaAndValue toConnectData(final String topic, final byte[] 
value) {
+        return null;
+    }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java
 
b/connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java
new file mode 100644
index 0000000..39109a1
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
+
+/**
+ * Samples data about its initialization environment for later analysis
+ */
+public class SamplingConverter extends SamplingTestPlugin implements Converter 
{
+
+  private static final ClassLoader STATIC_CLASS_LOADER;
+  private final ClassLoader classloader;
+  private Map<String, SamplingTestPlugin> samples;
+
+  static {
+    STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
+  }
+
+  {
+    samples = new HashMap<>();
+    classloader = Thread.currentThread().getContextClassLoader();
+  }
+
+  @Override
+  public void configure(final Map<String, ?> configs, final boolean isKey) {
+    logMethodCall(samples);
+  }
+
+  @Override
+  public byte[] fromConnectData(final String topic, final Schema schema, final 
Object value) {
+    logMethodCall(samples);
+    return new byte[0];
+  }
+
+  @Override
+  public SchemaAndValue toConnectData(final String topic, final byte[] value) {
+    logMethodCall(samples);
+    return null;
+  }
+
+  @Override
+  public ClassLoader staticClassloader() {
+    return STATIC_CLASS_LOADER;
+  }
+
+  @Override
+  public ClassLoader classloader() {
+    return classloader;
+  }
+
+  @Override
+  public Map<String, SamplingTestPlugin> otherSamples() {
+    return samples;
+  }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java
 
b/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java
new file mode 100644
index 0000000..11a1e28
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+/**
+ * Samples data about its initialization environment for later analysis
+ */
+public class SamplingHeaderConverter extends SamplingTestPlugin implements 
HeaderConverter {
+
+  private static final ClassLoader STATIC_CLASS_LOADER;
+  private final ClassLoader classloader;
+  private Map<String, SamplingTestPlugin> samples;
+
+  static {
+    STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
+  }
+
+  {
+    samples = new HashMap<>();
+    classloader = Thread.currentThread().getContextClassLoader();
+  }
+
+  @Override
+  public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] 
value) {
+    logMethodCall(samples);
+    return null;
+  }
+
+  @Override
+  public byte[] fromConnectHeader(String topic, String headerKey, Schema 
schema, Object value) {
+    logMethodCall(samples);
+    return new byte[0];
+  }
+
+  @Override
+  public ConfigDef config() {
+    logMethodCall(samples);
+    return null;
+  }
+
+  @Override
+  public void configure(final Map<String, ?> configs) {
+    logMethodCall(samples);
+  }
+
+  @Override
+  public void close() {
+    logMethodCall(samples);
+  }
+
+  @Override
+  public ClassLoader staticClassloader() {
+    return STATIC_CLASS_LOADER;
+  }
+
+  @Override
+  public ClassLoader classloader() {
+    return classloader;
+  }
+
+  @Override
+  public Map<String, SamplingTestPlugin> otherSamples() {
+    return samples;
+  }
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/test.plugins.ServiceLoadedClass
 
b/connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/test.plugins.ServiceLoadedClass
new file mode 100644
index 0000000..b8db865
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/test.plugins.ServiceLoadedClass
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.ServiceLoadedSubclass
\ No newline at end of file
diff --git 
a/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java
 
b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java
new file mode 100644
index 0000000..98677ed
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
+
+/**
+ * Superclass for service loaded classes
+ */
+public class ServiceLoadedClass extends SamplingTestPlugin {
+
+  private static final ClassLoader STATIC_CLASS_LOADER;
+  private final ClassLoader classloader;
+
+  static {
+    STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
+  }
+
+  {
+    classloader = Thread.currentThread().getContextClassLoader();
+  }
+
+  @Override
+  public ClassLoader staticClassloader() {
+    return STATIC_CLASS_LOADER;
+  }
+
+  @Override
+  public ClassLoader classloader() {
+    return classloader;
+  }
+
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedSubclass.java
 
b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedSubclass.java
new file mode 100644
index 0000000..cfc6b6f
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedSubclass.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+/**
+ * Instance of a service loaded class
+ */
+public class ServiceLoadedSubclass extends ServiceLoadedClass {
+
+  private static final ClassLoader STATIC_CLASS_LOADER;
+  private final ClassLoader classloader;
+
+  static {
+    STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
+  }
+
+  {
+    classloader = Thread.currentThread().getContextClassLoader();
+  }
+
+  @Override
+  public ClassLoader staticClassloader() {
+    return STATIC_CLASS_LOADER;
+  }
+
+  @Override
+  public ClassLoader classloader() {
+    return classloader;
+  }
+
+}
diff --git 
a/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java
 
b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java
new file mode 100644
index 0000000..e6371ba
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ServiceLoader;
+import java.util.Iterator;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
+
+/**
+ * Samples data about its initialization environment for later analysis
+ */
+public class ServiceLoaderPlugin extends SamplingTestPlugin implements 
Converter {
+
+  private static final ClassLoader STATIC_CLASS_LOADER;
+  private static final Map<String, SamplingTestPlugin> SAMPLES;
+  private final ClassLoader classloader;
+
+  static {
+    STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
+    SAMPLES = new HashMap<>();
+    Iterator<ServiceLoadedClass> it = 
ServiceLoader.load(ServiceLoadedClass.class).iterator();
+    while (it.hasNext()) {
+      ServiceLoadedClass loaded = it.next();
+      SAMPLES.put(loaded.getClass().getSimpleName() + ".static", loaded);
+    }
+  }
+
+  {
+    classloader = Thread.currentThread().getContextClassLoader();
+    Iterator<ServiceLoadedClass> it = 
ServiceLoader.load(ServiceLoadedClass.class).iterator();
+    while (it.hasNext()) {
+      ServiceLoadedClass loaded = it.next();
+      SAMPLES.put(loaded.getClass().getSimpleName() + ".dynamic", loaded);
+    }
+  }
+
+  @Override
+  public void configure(final Map<String, ?> configs, final boolean isKey) {
+  }
+
+  @Override
+  public byte[] fromConnectData(final String topic, final Schema schema, final 
Object value) {
+    return new byte[0];
+  }
+
+  @Override
+  public SchemaAndValue toConnectData(final String topic, final byte[] value) {
+    return null;
+  }
+
+  @Override
+  public ClassLoader staticClassloader() {
+    return STATIC_CLASS_LOADER;
+  }
+
+  @Override
+  public ClassLoader classloader() {
+    return classloader;
+  }
+
+  @Override
+  public Map<String, SamplingTestPlugin> otherSamples() {
+    return SAMPLES;
+  }
+}

Reply via email to