This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push: new ad5f31c KAFKA-9057: Backport KAFKA-8819 and KAFKA-8340 before 2.0 (#7549) ad5f31c is described below commit ad5f31c3e6868757eab9c8dd71efd5fd65f06ac6 Author: Greg Harris <gr...@confluent.io> AuthorDate: Tue Oct 22 11:54:13 2019 -0700 KAFKA-9057: Backport KAFKA-8819 and KAFKA-8340 before 2.0 (#7549) Includes fixes from PR-7315 (KAFKA-8819 and KAFKA-8340), but omits ConfigProvider and Configurable test cases and plugins, and replaces Java 8 language features with suitable Java 7 features. Signed-off-by: Greg Harris <gr...@confluent.io> Author: Greg Harris <gr...@confluent.io> Reviewer: Randall Hauch <rha...@gmail.com> --- checkstyle/import-control.xml | 1 + .../org/apache/kafka/connect/runtime/Worker.java | 4 +- .../runtime/isolation/DelegatingClassLoader.java | 39 +-- .../kafka/connect/runtime/isolation/Plugins.java | 82 +++--- .../connect/runtime/isolation/PluginsTest.java | 180 ++++++++++++- .../runtime/isolation/SamplingTestPlugin.java | 122 +++++++++ .../connect/runtime/isolation/TestPlugins.java | 277 +++++++++++++++++++++ .../test/plugins/AliasedStaticField.java | 75 ++++++ .../test/plugins/AlwaysThrowException.java | 53 ++++ .../test/plugins/SamplingConverter.java | 76 ++++++ .../test/plugins/SamplingHeaderConverter.java | 89 +++++++ .../services/test.plugins.ServiceLoadedClass | 16 ++ .../test/plugins/ServiceLoadedClass.java | 48 ++++ .../test/plugins/ServiceLoadedSubclass.java | 46 ++++ .../test/plugins/ServiceLoaderPlugin.java | 85 +++++++ 15 files changed, 1130 insertions(+), 63 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 000acc3..5a9316d 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -313,6 +313,7 @@ <subpackage name="isolation"> <allow pkg="com.fasterxml.jackson" /> <allow pkg="org.apache.maven.artifact.versioning" /> + <allow pkg="javax.tools" /> </subpackage> </subpackage> diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 1c64658..545797a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -367,10 +367,10 @@ public class Worker { final WorkerTask workerTask; ClassLoader savedLoader = plugins.currentThreadLoader(); try { - final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps); - String connType = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType); savedLoader = Plugins.compareAndSwapLoaders(connectorLoader); + final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps); final TaskConfig taskConfig = new TaskConfig(taskProps); final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class); final Task task = plugins.newTask(taskClass); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index 83acbaa..7cfd4d5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -107,13 +107,32 @@ public class DelegatingClassLoader extends URLClassLoader { return connectorLoader(connector.getClass().getName()); } + /** + * Retrieve the PluginClassLoader associated with a plugin class + * @param name The fully qualified class name of the plugin + * @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated. + */ + public PluginClassLoader pluginClassLoader(String name) { + if (!PluginUtils.shouldLoadInIsolation(name)) { + return null; + } + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name); + if (inner == null) { + return null; + } + ClassLoader pluginLoader = inner.get(inner.lastKey()); + return pluginLoader instanceof PluginClassLoader + ? (PluginClassLoader) pluginLoader + : null; + } + public ClassLoader connectorLoader(String connectorClassOrAlias) { log.debug("Getting plugin class loader for connector: '{}'", connectorClassOrAlias); String fullName = aliases.containsKey(connectorClassOrAlias) ? aliases.get(connectorClassOrAlias) : connectorClassOrAlias; - SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName); - if (inner == null) { + PluginClassLoader classLoader = pluginClassLoader(fullName); + if (classLoader == null) { log.error( "Plugin class loader for connector: '{}' was not found. Returning: {}", connectorClassOrAlias, @@ -121,7 +140,7 @@ public class DelegatingClassLoader extends URLClassLoader { ); return this; } - return inner.get(inner.lastKey()); + return classLoader; } private static PluginClassLoader newPluginClassLoader( @@ -317,19 +336,11 @@ public class DelegatingClassLoader extends URLClassLoader { @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { - if (!PluginUtils.shouldLoadInIsolation(name)) { - // There are no paths in this classloader, will attempt to load with the parent. - return super.loadClass(name, resolve); - } - String fullName = aliases.containsKey(name) ? aliases.get(name) : name; - SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName); - if (inner != null) { - ClassLoader pluginLoader = inner.get(inner.lastKey()); + PluginClassLoader pluginLoader = pluginClassLoader(fullName); + if (pluginLoader != null) { log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader); - return pluginLoader instanceof PluginClassLoader - ? ((PluginClassLoader) pluginLoader).loadClass(fullName, resolve) - : super.loadClass(fullName, resolve); + return pluginLoader.loadClass(fullName, resolve); } return super.loadClass(fullName, resolve); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 017d009..71de650 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.runtime.isolation; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.ConnectRecord; @@ -72,14 +71,38 @@ public class Plugins { } protected static <T> T newPlugin(Class<T> klass) { + // KAFKA-8340: The thread classloader is used during static initialization and must be + // set to the plugin's classloader during instantiation + ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader()); try { return Utils.newInstance(klass); } catch (Throwable t) { throw new ConnectException("Instantiation error", t); + } finally { + compareAndSwapLoaders(savedLoader); } } @SuppressWarnings("unchecked") + protected <U> Class<? extends U> pluginClassFromConfig( + AbstractConfig config, + String propertyName, + Class<U> pluginClass, + Collection<PluginDesc<U>> plugins + ) { + Class<?> klass = config.getClass(propertyName); + if (pluginClass.isAssignableFrom(klass)) { + return (Class<? extends U>) klass; + } + throw new ConnectException( + "Failed to find any class that implements " + pluginClass.getSimpleName() + + " for the config " + + propertyName + ", available classes are: " + + pluginNames(plugins) + ); + } + + @SuppressWarnings("unchecked") protected static <U> Class<? extends U> pluginClass( DelegatingClassLoader loader, String classOrAlias, @@ -199,18 +222,17 @@ public class Plugins { // This configuration does not define the converter via the specified property name return null; } - Converter plugin = null; + Class<? extends Converter> klass = null; switch (classLoaderUsage) { case CURRENT_CLASSLOADER: // Attempt to load first with the current classloader, and plugins as a fallback. // Note: we can't use config.getConfiguredInstance because Converter doesn't implement Configurable, and even if it did // we have to remove the property prefixes before calling config(...) and we still always want to call Converter.config. - plugin = getInstance(config, classPropertyName, Converter.class); + klass = pluginClassFromConfig(config, classPropertyName, Converter.class, delegatingLoader.converters()); break; case PLUGINS: // Attempt to load with the plugin class loader, which uses the current classloader as a fallback String converterClassOrAlias = config.getClass(classPropertyName).getName(); - Class<? extends Converter> klass; try { klass = pluginClass(delegatingLoader, converterClassOrAlias, Converter.class); } catch (ClassNotFoundException e) { @@ -220,11 +242,10 @@ public class Plugins { + pluginNames(delegatingLoader.converters()) ); } - plugin = newPlugin(klass); break; } - if (plugin == null) { - throw new ConnectException("Unable to instantiate the Converter specified in '" + classPropertyName + "'"); + if (klass == null) { + throw new ConnectException("Unable to initialize the Converter specified in '" + classPropertyName + "'"); } // Determine whether this is a key or value converter based upon the supplied property name ... @@ -236,7 +257,14 @@ public class Plugins { Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); log.debug("Configuring the {} converter with configuration keys:{}{}", isKeyConverter ? "key" : "value", System.lineSeparator(), converterConfig.keySet()); - plugin.configure(converterConfig, isKeyConverter); + Converter plugin; + ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader()); + try { + plugin = newPlugin(klass); + plugin.configure(converterConfig, isKeyConverter); + } finally { + compareAndSwapLoaders(savedLoader); + } return plugin; } @@ -251,7 +279,7 @@ public class Plugins { * @throws ConnectException if the {@link HeaderConverter} implementation class could not be found */ public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { - HeaderConverter plugin = null; + Class<? extends HeaderConverter> klass = null; switch (classLoaderUsage) { case CURRENT_CLASSLOADER: if (!config.originals().containsKey(classPropertyName)) { @@ -261,13 +289,12 @@ public class Plugins { // Attempt to load first with the current classloader, and plugins as a fallback. // Note: we can't use config.getConfiguredInstance because we have to remove the property prefixes // before calling config(...) - plugin = getInstance(config, classPropertyName, HeaderConverter.class); + klass = pluginClassFromConfig(config, classPropertyName, HeaderConverter.class, delegatingLoader.headerConverters()); break; case PLUGINS: // Attempt to load with the plugin class loader, which uses the current classloader as a fallback. // Note that there will always be at least a default header converter for the worker String converterClassOrAlias = config.getClass(classPropertyName).getName(); - Class<? extends HeaderConverter> klass; try { klass = pluginClass( delegatingLoader, @@ -282,38 +309,25 @@ public class Plugins { + pluginNames(delegatingLoader.headerConverters()) ); } - plugin = newPlugin(klass); } - if (plugin == null) { - throw new ConnectException("Unable to instantiate the Converter specified in '" + classPropertyName + "'"); + if (klass == null) { + throw new ConnectException("Unable to initialize the HeaderConverter specified in '" + classPropertyName + "'"); } String configPrefix = classPropertyName + "."; Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()); log.debug("Configuring the header converter with configuration keys:{}{}", System.lineSeparator(), converterConfig.keySet()); - plugin.configure(converterConfig); - return plugin; - } - /** - * Get an instance of the give class specified by the given configuration key. - * - * @param key The configuration key for the class - * @param t The interface the class should implement - * @return A instance of the class - */ - private <T> T getInstance(AbstractConfig config, String key, Class<T> t) { - Class<?> c = config.getClass(key); - if (c == null) { - return null; - } - // Instantiate the class, but we don't know if the class extends the supplied type - Object o = Utils.newInstance(c); - if (!t.isInstance(o)) { - throw new KafkaException(c.getName() + " is not an instance of " + t.getName()); + HeaderConverter plugin; + ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader()); + try { + plugin = newPlugin(klass); + plugin.configure(converterConfig); + } finally { + compareAndSwapLoaders(savedLoader); } - return t.cast(o); + return plugin; } public <R extends ConnectRecord<R>> Transformation<R> newTranformations( diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index a9a944f..e4df6c2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -17,9 +17,12 @@ package org.apache.kafka.connect.runtime.isolation; +import java.util.Map.Entry; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.json.JsonConverterConfig; @@ -31,7 +34,6 @@ import org.apache.kafka.connect.storage.ConverterType; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.SimpleHeaderConverter; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; @@ -41,29 +43,24 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; public class PluginsTest { - private static Map<String, String> pluginProps; - private static Plugins plugins; + private Plugins plugins; private Map<String, String> props; private AbstractConfig config; private TestConverter converter; private TestHeaderConverter headerConverter; - @BeforeClass - public static void beforeAll() { - pluginProps = new HashMap<>(); - - // Set up the plugins to have no additional plugin directories. - // This won't allow us to test classpath isolation, but it will allow us to test some of the utility methods. - pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, ""); - plugins = new Plugins(pluginProps); - } - @Before public void setup() { + Map<String, String> pluginProps = new HashMap<>(); + + // Set up the plugins with some test plugins to test isolation + pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, Utils.join(TestPlugins.pluginPath(), ",")); + plugins = new Plugins(pluginProps); props = new HashMap<>(pluginProps); props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); @@ -158,6 +155,163 @@ public class PluginsTest { assertTrue(headerConverter instanceof SimpleHeaderConverter); } + @Test(expected = ExceptionInInitializerError.class) + public void shouldThrowIfPluginThrows() { + TestPlugins.assertAvailable(); + props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestPlugins.ALWAYS_THROW_EXCEPTION); + ClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugins.ALWAYS_THROW_EXCEPTION); + ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader); + try { + createConfig(); + } finally { + Plugins.compareAndSwapLoaders(savedLoader); + } + } + + @Test + public void shouldShareStaticValuesBetweenSamePlugin() { + // Plugins are not isolated from other instances of their own class. + TestPlugins.assertAvailable(); + props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestPlugins.ALIASED_STATIC_FIELD); + ClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugins.ALIASED_STATIC_FIELD); + ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader); + createConfig(); + Plugins.compareAndSwapLoaders(savedLoader); + + Converter firstPlugin = plugins.newConverter( + config, + WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, + ClassLoaderUsage.PLUGINS + ); + + assertInstanceOf(SamplingTestPlugin.class, firstPlugin, "Cannot collect samples"); + + Converter secondPlugin = plugins.newConverter( + config, + WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, + ClassLoaderUsage.PLUGINS + ); + + assertInstanceOf(SamplingTestPlugin.class, secondPlugin, "Cannot collect samples"); + assertSame( + ((SamplingTestPlugin) firstPlugin).otherSamples(), + ((SamplingTestPlugin) secondPlugin).otherSamples() + ); + } + + @Test + public void newPluginShouldServiceLoadWithPluginClassLoader() { + TestPlugins.assertAvailable(); + props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestPlugins.SERVICE_LOADER); + ClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugins.SERVICE_LOADER); + ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader); + createConfig(); + Plugins.compareAndSwapLoaders(savedLoader); + Converter plugin = plugins.newConverter( + config, + WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, + ClassLoaderUsage.PLUGINS + ); + + assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); + Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten(); + // Assert that the service loaded subclass is found in both environments + assertTrue(samples.containsKey("ServiceLoadedSubclass.static")); + assertTrue(samples.containsKey("ServiceLoadedSubclass.dynamic")); + assertPluginClassLoaderAlwaysActive(samples); + } + + @Test + public void newPluginShouldInstantiateWithPluginClassLoader() { + TestPlugins.assertAvailable(); + props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestPlugins.SERVICE_LOADER); + ClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugins.SERVICE_LOADER); + ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader); + createConfig(); + Plugins.compareAndSwapLoaders(savedLoader); + Converter plugin = plugins.newConverter( + config, + WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, + ClassLoaderUsage.PLUGINS + ); + + assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); + Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten(); + assertPluginClassLoaderAlwaysActive(samples); + } + + @Test(expected = ConfigException.class) + public void shouldFailToFindConverterInCurrentClassloader() { + TestPlugins.assertAvailable(); + props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestPlugins.SAMPLING_CONVERTER); + createConfig(); + } + + @Test + public void newConverterShouldConfigureWithPluginClassLoader() { + TestPlugins.assertAvailable(); + props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestPlugins.SAMPLING_CONVERTER); + ClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugins.SAMPLING_CONVERTER); + ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader); + createConfig(); + Plugins.compareAndSwapLoaders(savedLoader); + + Converter plugin = plugins.newConverter( + config, + WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, + ClassLoaderUsage.PLUGINS + ); + + assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); + Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten(); + assertTrue(samples.containsKey("configure")); + assertPluginClassLoaderAlwaysActive(samples); + } + + @Test + public void newHeaderConverterShouldConfigureWithPluginClassLoader() { + TestPlugins.assertAvailable(); + props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestPlugins.SAMPLING_HEADER_CONVERTER); + ClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugins.SAMPLING_HEADER_CONVERTER); + ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader); + createConfig(); + Plugins.compareAndSwapLoaders(savedLoader); + + HeaderConverter plugin = plugins.newHeaderConverter( + config, + WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, + ClassLoaderUsage.PLUGINS + ); + + assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); + Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten(); + assertTrue(samples.containsKey("configure")); // HeaderConverter::configure was called + assertPluginClassLoaderAlwaysActive(samples); + } + + public static void assertPluginClassLoaderAlwaysActive(Map<String, SamplingTestPlugin> samples) { + for (Entry<String, SamplingTestPlugin> e : samples.entrySet()) { + String sampleName = "\"" + e.getKey() + "\" (" + e.getValue() + ")"; + assertInstanceOf( + PluginClassLoader.class, + e.getValue().staticClassloader(), + sampleName + " has incorrect static classloader" + ); + assertInstanceOf( + PluginClassLoader.class, + e.getValue().classloader(), + sampleName + " has incorrect dynamic classloader" + ); + } + } + + public static void assertInstanceOf(Class<?> expected, Object actual, String message) { + assertTrue( + "Expected an instance of " + expected.getSimpleName() + ", found " + actual + " instead: " + message, + expected.isInstance(actual) + ); + } + protected void instantiateAndConfigureConverter(String configPropName, ClassLoaderUsage classLoaderUsage) { converter = (TestConverter) plugins.newConverter(config, configPropName, classLoaderUsage); assertNotNull(converter); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java new file mode 100644 index 0000000..bcf8881 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +/** + * Base class for plugins so we can sample information about their initialization + */ +public abstract class SamplingTestPlugin { + + /** + * @return the ClassLoader used to statically initialize this plugin class + */ + public abstract ClassLoader staticClassloader(); + + /** + * @return the ClassLoader used to initialize this plugin instance + */ + public abstract ClassLoader classloader(); + + /** + * @return a group of other SamplingTestPlugin instances known by this plugin + * This should only return direct children, and not reference this instance directly + */ + public Map<String, SamplingTestPlugin> otherSamples() { + return Collections.emptyMap(); + } + + /** + * @return a flattened list of child samples including this entry keyed as "this" + */ + public Map<String, SamplingTestPlugin> flatten() { + Map<String, SamplingTestPlugin> out = new HashMap<>(); + Map<String, SamplingTestPlugin> otherSamples = otherSamples(); + if (otherSamples != null) { + for (Entry<String, SamplingTestPlugin> child : otherSamples.entrySet()) { + for (Entry<String, SamplingTestPlugin> flattened : child.getValue().flatten().entrySet()) { + String key = child.getKey(); + if (flattened.getKey().length() > 0) { + key += "." + flattened.getKey(); + } + out.put(key, flattened.getValue()); + } + } + } + out.put("", this); + return out; + } + + /** + * Log the parent method call as a child sample. + * Stores only the last invocation of each method if there are multiple invocations. + * @param samples The collection of samples to which this method call should be added + */ + public void logMethodCall(Map<String, SamplingTestPlugin> samples) { + StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace(); + if (stackTraces.length < 2) { + return; + } + // 0 is inside getStackTrace + // 1 is this method + // 2 is our caller method + StackTraceElement caller = stackTraces[2]; + + samples.put(caller.getMethodName(), new MethodCallSample( + caller, + Thread.currentThread().getContextClassLoader(), + getClass().getClassLoader() + )); + } + + public static class MethodCallSample extends SamplingTestPlugin { + + private final StackTraceElement caller; + private final ClassLoader staticClassLoader; + private final ClassLoader dynamicClassLoader; + + public MethodCallSample( + StackTraceElement caller, + ClassLoader staticClassLoader, + ClassLoader dynamicClassLoader + ) { + this.caller = caller; + this.staticClassLoader = staticClassLoader; + this.dynamicClassLoader = dynamicClassLoader; + } + + @Override + public ClassLoader staticClassloader() { + return staticClassLoader; + } + + @Override + public ClassLoader classloader() { + return dynamicClassLoader; + } + + @Override + public String toString() { + return caller.toString(); + } + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java new file mode 100644 index 0000000..8a08c07 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.net.URL; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.jar.Attributes; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; +import javax.tools.JavaCompiler; +import javax.tools.StandardJavaFileManager; +import javax.tools.ToolProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for constructing test plugins for Connect. + * + * <p>Plugins are built from their source under resources/test-plugins/ and placed into temporary + * jar files that are deleted when the process exits. + * + * <p>To add a plugin, create the source files in the resource tree, and edit this class to build + * that plugin during initialization. For example, the plugin class {@literal package.Class} should + * be placed in {@literal resources/test-plugins/something/package/Class.java} and loaded using + * {@code createPluginJar("something")}. The class name, contents, and plugin directory can take + * any value you need for testing. + * + * <p>To use this class in your tests, make sure to first call + * {@link TestPlugins#assertAvailable()} to verify that the plugins initialized correctly. + * Otherwise, exceptions during the plugin build are not propagated, and may invalidate your test. + * You can access the list of plugin jars for assembling a {@literal plugin.path}, and reference + * the names of the different plugins directly via the exposed constants. + */ +public class TestPlugins { + + /** + * Class name of a plugin which will always throw an exception during loading + */ + public static final String ALWAYS_THROW_EXCEPTION = "test.plugins.AlwaysThrowException"; + /** + * Class name of a plugin which samples information about its initialization. + */ + public static final String ALIASED_STATIC_FIELD = "test.plugins.AliasedStaticField"; + /** + * Class name of a {@link org.apache.kafka.connect.storage.Converter} + * which samples information about its method calls. + */ + public static final String SAMPLING_CONVERTER = "test.plugins.SamplingConverter"; + /** + * Class name of a {@link org.apache.kafka.connect.storage.HeaderConverter} + * which samples information about its method calls. + */ + public static final String SAMPLING_HEADER_CONVERTER = "test.plugins.SamplingHeaderConverter"; + /** + * Class name of a plugin which uses a {@link java.util.ServiceLoader} + * to load internal classes, and samples information about their initialization. + */ + public static final String SERVICE_LOADER = "test.plugins.ServiceLoaderPlugin"; + + private static final Logger log = LoggerFactory.getLogger(TestPlugins.class); + private static final Map<String, File> PLUGIN_JARS; + private static final Throwable INITIALIZATION_EXCEPTION; + + static { + Throwable err = null; + HashMap<String, File> pluginJars = new HashMap<>(); + try { + pluginJars.put(ALWAYS_THROW_EXCEPTION, createPluginJar("always-throw-exception")); + pluginJars.put(ALIASED_STATIC_FIELD, createPluginJar("aliased-static-field")); + pluginJars.put(SAMPLING_CONVERTER, createPluginJar("sampling-converter")); + pluginJars.put(SAMPLING_HEADER_CONVERTER, createPluginJar("sampling-header-converter")); + pluginJars.put(SERVICE_LOADER, createPluginJar("service-loader")); + } catch (Throwable e) { + log.error("Could not set up plugin test jars", e); + err = e; + } + PLUGIN_JARS = Collections.unmodifiableMap(pluginJars); + INITIALIZATION_EXCEPTION = err; + } + + /** + * Ensure that the test plugin JARs were assembled without error before continuing. + * @throws AssertionError if any plugin failed to load, or no plugins were loaded. + */ + public static void assertAvailable() throws AssertionError { + if (INITIALIZATION_EXCEPTION != null) { + throw new AssertionError("TestPlugins did not initialize completely", + INITIALIZATION_EXCEPTION); + } + if (PLUGIN_JARS.isEmpty()) { + throw new AssertionError("No test plugins loaded"); + } + } + + /** + * A list of jar files containing test plugins + * @return A list of plugin jar filenames + */ + public static List<String> pluginPath() { + List<String> out = new ArrayList<>(); + for (File f : PLUGIN_JARS.values()) { + out.add(f.getPath()); + } + return out; + } + + /** + * Get all of the classes that were successfully built by this class + * @return A list of plugin class names + */ + public static List<String> pluginClasses() { + return new ArrayList<>(PLUGIN_JARS.keySet()); + } + + private static File createPluginJar(String resourceDir) throws IOException { + Path inputDir = resourceDirectoryPath("test-plugins/" + resourceDir); + Path binDir = Files.createTempDirectory(resourceDir + ".bin."); + compileJavaSources(inputDir, binDir); + File jarFile = Files.createTempFile(resourceDir + ".", ".jar").toFile(); + try (JarOutputStream jar = openJarFile(jarFile)) { + writeJar(jar, inputDir); + writeJar(jar, binDir); + } + removeDirectory(binDir); + jarFile.deleteOnExit(); + return jarFile; + } + + private static Path resourceDirectoryPath(String resourceDir) throws IOException { + URL resource = Thread.currentThread() + .getContextClassLoader() + .getResource(resourceDir); + if (resource == null) { + throw new IOException("Could not find test plugin resource: " + resourceDir); + } + File file = new File(resource.getFile()); + if (!file.isDirectory()) { + throw new IOException("Resource is not a directory: " + resourceDir); + } + if (!file.canRead()) { + throw new IOException("Resource directory is not readable: " + resourceDir); + } + return file.toPath(); + } + + private static JarOutputStream openJarFile(File jarFile) throws IOException { + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); + return new JarOutputStream(new FileOutputStream(jarFile), manifest); + } + + private static void removeDirectory(Path binDir) throws IOException { + Files.walkFileTree(binDir, new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + if (!file.toFile().delete()) { + log.info("Could not delete " + file); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) { + if (!dir.toFile().delete()) { + log.info("Could not delete " + dir); + } + return FileVisitResult.CONTINUE; + } + }); + } + + /** + * Compile a directory of .java source files into .class files + * .class files are placed into the same directory as their sources. + * + * <p>Dependencies between source files in this directory are resolved against one another + * and the classes present in the test environment. + * See https://stackoverflow.com/questions/1563909/ for more information. + * Additional dependencies in your plugins should be added as test scope to :connect:runtime. + * @param sourceDir Directory containing java source files + * @throws IOException if the files cannot be compiled + */ + private static void compileJavaSources(Path sourceDir, Path binDir) throws IOException { + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + final List<File> sourceFiles = new ArrayList<>(); + Files.walkFileTree(sourceDir, new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + if (file.toFile().getName().endsWith(".java")) { + sourceFiles.add(file.toFile()); + } + return FileVisitResult.CONTINUE; + } + }); + + StringWriter writer = new StringWriter(); + List<String> options = Arrays.asList( + "-d", binDir.toString() // Write class output to a different directory. + ); + + try (StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null)) { + boolean success = compiler.getTask( + writer, + fileManager, + null, + options, + null, + fileManager.getJavaFileObjectsFromFiles(sourceFiles) + ).call(); + if (!success) { + throw new RuntimeException("Failed to compile test plugin:\n" + writer); + } + } + } + + private static void writeJar(JarOutputStream jar, Path inputDir) throws IOException { + final List<Path> paths = new ArrayList<>(); + Files.walkFileTree(inputDir, new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + if (!file.toFile().getName().endsWith(".java")) { + paths.add(file); + } + return FileVisitResult.CONTINUE; + } + }); + for (Path path : paths) { + try (InputStream in = new BufferedInputStream(new FileInputStream(path.toFile()))) { + jar.putNextEntry(new JarEntry( + inputDir.relativize(path) + .toFile() + .getPath() + .replace(File.separator, "/") + )); + byte[] buffer = new byte[1024]; + for (int count; (count = in.read(buffer)) != -1; ) { + jar.write(buffer, 0, count); + } + jar.closeEntry(); + } + } + } + +} diff --git a/connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java b/connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java new file mode 100644 index 0000000..d865f4e --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.util.Map; +import java.util.HashMap; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; + +/** + * Samples data about its initialization environment for later analysis + * Samples are shared between instances of the same class in a static variable + */ +public class AliasedStaticField extends SamplingTestPlugin implements Converter { + + private static final Map<String, SamplingTestPlugin> SAMPLES; + private static final ClassLoader STATIC_CLASS_LOADER; + private final ClassLoader classloader; + + static { + SAMPLES = new HashMap<>(); + STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + } + + { + classloader = Thread.currentThread().getContextClassLoader(); + } + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + + } + + @Override + public byte[] fromConnectData(final String topic, final Schema schema, final Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(final String topic, final byte[] value) { + return null; + } + + @Override + public ClassLoader staticClassloader() { + return STATIC_CLASS_LOADER; + } + + @Override + public ClassLoader classloader() { + return classloader; + } + + @Override + public Map<String, SamplingTestPlugin> otherSamples() { + return SAMPLES; + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java b/connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java new file mode 100644 index 0000000..858f3ed --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.util.Map; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; +import org.apache.kafka.connect.storage.Converter; + +/** + * Unconditionally throw an exception during static initialization. + */ +public class AlwaysThrowException implements Converter { + + static { + setup(); + } + + public static void setup() { + throw new RuntimeException("I always throw an exception"); + } + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + + } + + @Override + public byte[] fromConnectData(final String topic, final Schema schema, final Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(final String topic, final byte[] value) { + return null; + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java b/connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java new file mode 100644 index 0000000..39109a1 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.util.Map; +import java.util.HashMap; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; + +/** + * Samples data about its initialization environment for later analysis + */ +public class SamplingConverter extends SamplingTestPlugin implements Converter { + + private static final ClassLoader STATIC_CLASS_LOADER; + private final ClassLoader classloader; + private Map<String, SamplingTestPlugin> samples; + + static { + STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + } + + { + samples = new HashMap<>(); + classloader = Thread.currentThread().getContextClassLoader(); + } + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + logMethodCall(samples); + } + + @Override + public byte[] fromConnectData(final String topic, final Schema schema, final Object value) { + logMethodCall(samples); + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(final String topic, final byte[] value) { + logMethodCall(samples); + return null; + } + + @Override + public ClassLoader staticClassloader() { + return STATIC_CLASS_LOADER; + } + + @Override + public ClassLoader classloader() { + return classloader; + } + + @Override + public Map<String, SamplingTestPlugin> otherSamples() { + return samples; + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java b/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java new file mode 100644 index 0000000..11a1e28 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.util.Map; +import java.util.HashMap; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; +import org.apache.kafka.connect.storage.HeaderConverter; + +/** + * Samples data about its initialization environment for later analysis + */ +public class SamplingHeaderConverter extends SamplingTestPlugin implements HeaderConverter { + + private static final ClassLoader STATIC_CLASS_LOADER; + private final ClassLoader classloader; + private Map<String, SamplingTestPlugin> samples; + + static { + STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + } + + { + samples = new HashMap<>(); + classloader = Thread.currentThread().getContextClassLoader(); + } + + @Override + public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) { + logMethodCall(samples); + return null; + } + + @Override + public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) { + logMethodCall(samples); + return new byte[0]; + } + + @Override + public ConfigDef config() { + logMethodCall(samples); + return null; + } + + @Override + public void configure(final Map<String, ?> configs) { + logMethodCall(samples); + } + + @Override + public void close() { + logMethodCall(samples); + } + + @Override + public ClassLoader staticClassloader() { + return STATIC_CLASS_LOADER; + } + + @Override + public ClassLoader classloader() { + return classloader; + } + + @Override + public Map<String, SamplingTestPlugin> otherSamples() { + return samples; + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/test.plugins.ServiceLoadedClass b/connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/test.plugins.ServiceLoadedClass new file mode 100644 index 0000000..b8db865 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/test.plugins.ServiceLoadedClass @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.ServiceLoadedSubclass \ No newline at end of file diff --git a/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java new file mode 100644 index 0000000..98677ed --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; + +/** + * Superclass for service loaded classes + */ +public class ServiceLoadedClass extends SamplingTestPlugin { + + private static final ClassLoader STATIC_CLASS_LOADER; + private final ClassLoader classloader; + + static { + STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + } + + { + classloader = Thread.currentThread().getContextClassLoader(); + } + + @Override + public ClassLoader staticClassloader() { + return STATIC_CLASS_LOADER; + } + + @Override + public ClassLoader classloader() { + return classloader; + } + +} diff --git a/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedSubclass.java b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedSubclass.java new file mode 100644 index 0000000..cfc6b6f --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedSubclass.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +/** + * Instance of a service loaded class + */ +public class ServiceLoadedSubclass extends ServiceLoadedClass { + + private static final ClassLoader STATIC_CLASS_LOADER; + private final ClassLoader classloader; + + static { + STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + } + + { + classloader = Thread.currentThread().getContextClassLoader(); + } + + @Override + public ClassLoader staticClassloader() { + return STATIC_CLASS_LOADER; + } + + @Override + public ClassLoader classloader() { + return classloader; + } + +} diff --git a/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java new file mode 100644 index 0000000..e6371ba --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.util.Map; +import java.util.HashMap; +import java.util.ServiceLoader; +import java.util.Iterator; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; + +/** + * Samples data about its initialization environment for later analysis + */ +public class ServiceLoaderPlugin extends SamplingTestPlugin implements Converter { + + private static final ClassLoader STATIC_CLASS_LOADER; + private static final Map<String, SamplingTestPlugin> SAMPLES; + private final ClassLoader classloader; + + static { + STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + SAMPLES = new HashMap<>(); + Iterator<ServiceLoadedClass> it = ServiceLoader.load(ServiceLoadedClass.class).iterator(); + while (it.hasNext()) { + ServiceLoadedClass loaded = it.next(); + SAMPLES.put(loaded.getClass().getSimpleName() + ".static", loaded); + } + } + + { + classloader = Thread.currentThread().getContextClassLoader(); + Iterator<ServiceLoadedClass> it = ServiceLoader.load(ServiceLoadedClass.class).iterator(); + while (it.hasNext()) { + ServiceLoadedClass loaded = it.next(); + SAMPLES.put(loaded.getClass().getSimpleName() + ".dynamic", loaded); + } + } + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + } + + @Override + public byte[] fromConnectData(final String topic, final Schema schema, final Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(final String topic, final byte[] value) { + return null; + } + + @Override + public ClassLoader staticClassloader() { + return STATIC_CLASS_LOADER; + } + + @Override + public ClassLoader classloader() { + return classloader; + } + + @Override + public Map<String, SamplingTestPlugin> otherSamples() { + return SAMPLES; + } +}