This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1b925e9ee75 KAFKA-15069: Refactor plugin scanning logic into
ReflectionScanner (#13821)
1b925e9ee75 is described below
commit 1b925e9ee753390d4633e8e5bd25a00af8d26cb9
Author: Greg Harris <[email protected]>
AuthorDate: Thu Jul 6 10:22:28 2023 -0700
KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner (#13821)
Reviewers: Chris Egerton <[email protected]>
---
.../runtime/isolation/ClassLoaderFactory.java | 41 +++
.../runtime/isolation/DelegatingClassLoader.java | 299 +--------------------
.../isolation/PluginClassLoaderFactory.java} | 26 +-
.../connect/runtime/isolation/PluginDesc.java | 1 +
.../connect/runtime/isolation/PluginScanner.java | 186 +++++++++++++
.../connect/runtime/isolation/PluginSource.java | 66 +++++
.../connect/runtime/isolation/PluginUtils.java | 31 +++
.../kafka/connect/runtime/isolation/Plugins.java | 20 +-
.../runtime/isolation/ReflectionScanner.java | 163 +++++++++++
.../connect/runtime/rest/entities/PluginInfo.java | 3 +-
.../isolation/DelegatingClassLoaderTest.java | 147 +++++-----
...ClassLoaderTest.java => PluginScannerTest.java} | 56 ++--
.../connect/runtime/isolation/PluginsTest.java | 2 +-
.../runtime/isolation/SynchronizationTest.java | 48 ++--
.../runtime/rest/entities/PluginInfoTest.java | 4 +-
.../resources/ConnectorPluginsResourceTest.java | 4 +-
16 files changed, 635 insertions(+), 462 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
new file mode 100644
index 00000000000..9de24f796d1
--- /dev/null
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Factory for {@link DelegatingClassLoader} and {@link PluginClassLoader}
instances.
+ * Used for mocking classloader initialization in tests.
+ */
+public class ClassLoaderFactory implements PluginClassLoaderFactory {
+
+ public DelegatingClassLoader newDelegatingClassLoader(ClassLoader parent) {
+ return AccessController.doPrivileged(
+ (PrivilegedAction<DelegatingClassLoader>) () -> new
DelegatingClassLoader(parent)
+ );
+ }
+
+ public PluginClassLoader newPluginClassLoader(URL pluginLocation, URL[]
urls, ClassLoader parent) {
+ return AccessController.doPrivileged(
+ (PrivilegedAction<PluginClassLoader>) () -> new
PluginClassLoader(pluginLocation, urls, parent)
+ );
+ }
+
+}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index cf0a5a1ba7f..dd342c67a43 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -16,49 +16,15 @@
*/
package org.apache.kafka.connect.runtime.isolation;
-import org.apache.kafka.common.config.provider.ConfigProvider;
-import org.apache.kafka.connect.components.Versioned;
-import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
-import org.apache.kafka.connect.rest.ConnectRestExtension;
-import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.kafka.connect.source.SourceConnector;
-import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.storage.HeaderConverter;
-import org.apache.kafka.connect.transforms.Transformation;
-import org.apache.kafka.connect.transforms.predicates.Predicate;
-import org.reflections.Configuration;
-import org.reflections.Reflections;
-import org.reflections.ReflectionsException;
-import org.reflections.scanners.SubTypesScanner;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
-import java.nio.file.InvalidPathException;
-import java.nio.file.Path;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.sql.Driver;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.ServiceConfigurationError;
-import java.util.ServiceLoader;
-import java.util.Set;
import java.util.SortedMap;
-import java.util.SortedSet;
import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -75,11 +41,9 @@ import java.util.concurrent.ConcurrentMap;
*/
public class DelegatingClassLoader extends URLClassLoader {
private static final Logger log =
LoggerFactory.getLogger(DelegatingClassLoader.class);
- public static final String UNDEFINED_VERSION = "undefined";
private final ConcurrentMap<String, SortedMap<PluginDesc<?>, ClassLoader>>
pluginLoaders;
private final ConcurrentMap<String, String> aliases;
- private final List<Path> pluginLocations;
// Although this classloader does not load classes directly but rather
delegates loading to a
// PluginClassLoader or its parent through its base class, because of the
use of inheritance in
@@ -89,19 +53,18 @@ public class DelegatingClassLoader extends URLClassLoader {
ClassLoader.registerAsParallelCapable();
}
- public DelegatingClassLoader(List<Path> pluginLocations, ClassLoader
parent) {
+ public DelegatingClassLoader(ClassLoader parent) {
super(new URL[0], parent);
- this.pluginLocations = pluginLocations;
this.pluginLoaders = new ConcurrentHashMap<>();
this.aliases = new ConcurrentHashMap<>();
}
- public DelegatingClassLoader(List<Path> pluginLocations) {
+ public DelegatingClassLoader() {
// Use as parent the classloader that loaded this class. In most cases
this will be the
// System classloader. But this choice here provides additional
flexibility in managed
// environments that control classloading differently (OSGi, Spring
and others) and don't
// depend on the System classloader to load Connect's classes.
- this(pluginLocations, DelegatingClassLoader.class.getClassLoader());
+ this(DelegatingClassLoader.class.getClassLoader());
}
/**
@@ -136,240 +99,7 @@ public class DelegatingClassLoader extends URLClassLoader {
return classLoader;
}
- // VisibleForTesting
- PluginClassLoader newPluginClassLoader(
- final URL pluginLocation,
- final URL[] urls,
- final ClassLoader parent
- ) {
- return AccessController.doPrivileged(
- (PrivilegedAction<PluginClassLoader>) () -> new
PluginClassLoader(pluginLocation, urls, parent)
- );
- }
-
- public PluginScanResult initLoaders() {
- List<PluginScanResult> results = new ArrayList<>();
- for (Path pluginLocation : pluginLocations) {
- try {
- results.add(registerPlugin(pluginLocation));
- } catch (InvalidPathException | MalformedURLException e) {
- log.error("Invalid path in plugin path: {}. Ignoring.",
pluginLocation, e);
- } catch (IOException e) {
- log.error("Could not get listing for plugin path: {}.
Ignoring.", pluginLocation, e);
- }
- }
- // Finally add parent/system loader.
- results.add(scanUrlsAndAddPlugins(
- getParent(),
- ClasspathHelper.forJavaClassPath().toArray(new URL[0])
- ));
- PluginScanResult scanResult = new PluginScanResult(results);
- installDiscoveredPlugins(scanResult);
- return scanResult;
- }
-
- private PluginScanResult registerPlugin(Path pluginLocation)
- throws IOException {
- log.info("Loading plugin from: {}", pluginLocation);
- List<URL> pluginUrls = new ArrayList<>();
- for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
- pluginUrls.add(path.toUri().toURL());
- }
- URL[] urls = pluginUrls.toArray(new URL[0]);
- if (log.isDebugEnabled()) {
- log.debug("Loading plugin urls: {}", Arrays.toString(urls));
- }
- PluginClassLoader loader = newPluginClassLoader(
- pluginLocation.toUri().toURL(),
- urls,
- this
- );
- return scanUrlsAndAddPlugins(loader, urls);
- }
-
- private PluginScanResult scanUrlsAndAddPlugins(
- ClassLoader loader,
- URL[] urls
- ) {
- PluginScanResult plugins = scanPluginPath(loader, urls);
- log.info("Registered loader: {}", loader);
- loadJdbcDrivers(loader);
- return plugins;
- }
-
- private void loadJdbcDrivers(final ClassLoader loader) {
- // Apply here what java.sql.DriverManager does to discover and
register classes
- // implementing the java.sql.Driver interface.
- AccessController.doPrivileged(
- (PrivilegedAction<Void>) () -> {
- ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
- Driver.class,
- loader
- );
- Iterator<Driver> driversIterator = loadedDrivers.iterator();
- try {
- while (driversIterator.hasNext()) {
- Driver driver = driversIterator.next();
- log.debug(
- "Registered java.sql.Driver: {} to
java.sql.DriverManager",
- driver
- );
- }
- } catch (Throwable t) {
- log.debug(
- "Ignoring java.sql.Driver classes listed in
resources but not"
- + " present in class loader's classpath: ",
- t
- );
- }
- return null;
- }
- );
- }
-
- private PluginScanResult scanPluginPath(
- ClassLoader loader,
- URL[] urls
- ) {
- ConfigurationBuilder builder = new ConfigurationBuilder();
- builder.setClassLoaders(new ClassLoader[]{loader});
- builder.addUrls(urls);
- builder.setScanners(new SubTypesScanner());
- builder.useParallelExecutor();
- Reflections reflections = new InternalReflections(builder);
-
- return new PluginScanResult(
- getPluginDesc(reflections, SinkConnector.class, loader),
- getPluginDesc(reflections, SourceConnector.class, loader),
- getPluginDesc(reflections, Converter.class, loader),
- getPluginDesc(reflections, HeaderConverter.class, loader),
- getTransformationPluginDesc(loader, reflections),
- getPredicatePluginDesc(loader, reflections),
- getServiceLoaderPluginDesc(ConfigProvider.class, loader),
- getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
-
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
- );
- }
-
- @SuppressWarnings({"unchecked"})
- private SortedSet<PluginDesc<Predicate<?>>>
getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
- return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>)
getPluginDesc(reflections, Predicate.class, loader);
- }
-
- @SuppressWarnings({"unchecked"})
- private SortedSet<PluginDesc<Transformation<?>>>
getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
- return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>)
getPluginDesc(reflections, Transformation.class, loader);
- }
-
- private <T> SortedSet<PluginDesc<T>> getPluginDesc(
- Reflections reflections,
- Class<T> klass,
- ClassLoader loader
- ) {
- Set<Class<? extends T>> plugins;
- try {
- plugins = reflections.getSubTypesOf(klass);
- } catch (ReflectionsException e) {
- log.debug("Reflections scanner could not find any classes for
URLs: " +
- reflections.getConfiguration().getUrls(), e);
- return Collections.emptySortedSet();
- }
-
- SortedSet<PluginDesc<T>> result = new TreeSet<>();
- for (Class<? extends T> pluginKlass : plugins) {
- if (!PluginUtils.isConcrete(pluginKlass)) {
- log.debug("Skipping {} as it is not concrete implementation",
pluginKlass);
- continue;
- }
- if (pluginKlass.getClassLoader() != loader) {
- log.debug("{} from other classloader {} is visible from {},
excluding to prevent isolated loading",
- pluginKlass.getSimpleName(),
pluginKlass.getClassLoader(), loader);
- continue;
- }
- try (LoaderSwap loaderSwap = withClassLoader(loader)) {
- result.add(pluginDesc(pluginKlass, versionFor(pluginKlass),
loader));
- } catch (ReflectiveOperationException | LinkageError e) {
- log.error("Failed to discover {}: Unable to instantiate {}{}",
klass.getSimpleName(), pluginKlass.getSimpleName(),
reflectiveErrorDescription(e), e);
- }
- }
- return result;
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String
version, ClassLoader loader) {
- return new PluginDesc(plugin, version, loader);
- }
-
- @SuppressWarnings("unchecked")
- private <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T>
klass, ClassLoader loader) {
- SortedSet<PluginDesc<T>> result = new TreeSet<>();
- ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
- for (Iterator<T> iterator = serviceLoader.iterator();
iterator.hasNext(); ) {
- try (LoaderSwap loaderSwap = withClassLoader(loader)) {
- T pluginImpl;
- try {
- pluginImpl = iterator.next();
- } catch (ServiceConfigurationError t) {
- log.error("Failed to discover {}{}",
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
- continue;
- }
- Class<? extends T> pluginKlass = (Class<? extends T>)
pluginImpl.getClass();
- if (pluginKlass.getClassLoader() != loader) {
- log.debug("{} from other classloader {} is visible from
{}, excluding to prevent isolated loading",
- pluginKlass.getSimpleName(),
pluginKlass.getClassLoader(), loader);
- continue;
- }
- result.add(pluginDesc(pluginKlass, versionFor(pluginImpl),
loader));
- }
- }
- return result;
- }
-
- private static <T> String versionFor(T pluginImpl) {
- try {
- if (pluginImpl instanceof Versioned) {
- return ((Versioned) pluginImpl).version();
- }
- } catch (Throwable t) {
- log.error("Failed to get plugin version for " +
pluginImpl.getClass(), t);
- }
- return UNDEFINED_VERSION;
- }
-
- public static <T> String versionFor(Class<? extends T> pluginKlass) throws
ReflectiveOperationException {
- // Unconditionally use the default constructor to create an instance
to assert that
- // the constructor exists and can complete successfully.
- T pluginImpl = pluginKlass.getDeclaredConstructor().newInstance();
- return versionFor(pluginImpl);
- }
-
- private static String reflectiveErrorDescription(Throwable t) {
- if (t instanceof NoSuchMethodException) {
- return ": Plugin class must have a no-args constructor, and cannot
be a non-static inner class";
- } else if (t instanceof SecurityException) {
- return ": Security settings must allow reflective instantiation of
plugin classes";
- } else if (t instanceof IllegalAccessException) {
- return ": Plugin class default constructor must be public";
- } else if (t instanceof ExceptionInInitializerError) {
- return ": Failed to statically initialize plugin class";
- } else if (t instanceof InvocationTargetException) {
- return ": Failed to invoke plugin constructor";
- } else {
- return "";
- }
- }
-
- public LoaderSwap withClassLoader(ClassLoader loader) {
- ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
- try {
- return new LoaderSwap(savedLoader);
- } catch (Throwable t) {
- Plugins.compareAndSwapLoaders(savedLoader);
- throw t;
- }
- }
-
- private void installDiscoveredPlugins(PluginScanResult scanResult) {
+ public void installDiscoveredPlugins(PluginScanResult scanResult) {
pluginLoaders.putAll(computePluginLoaders(scanResult));
for (String pluginClassName : pluginLoaders.keySet()) {
log.info("Added plugin '{}'", pluginClassName);
@@ -399,25 +129,4 @@ public class DelegatingClassLoader extends URLClassLoader {
.put(pluginDesc, pluginDesc.loader()));
return pluginLoaders;
}
-
- private static class InternalReflections extends Reflections {
-
- public InternalReflections(Configuration configuration) {
- super(configuration);
- }
-
- // When Reflections is used for parallel scans, it has a bug where it
propagates ReflectionsException
- // as RuntimeException. Override the scan behavior to emulate the
singled-threaded logic.
- @Override
- protected void scan(URL url) {
- try {
- super.scan(url);
- } catch (ReflectionsException e) {
- Logger log = Reflections.log;
- if (log != null && log.isWarnEnabled()) {
- log.warn("could not create Vfs.Dir from url. ignoring the
exception and continuing", e);
- }
- }
- }
- }
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoaderFactory.java
similarity index 50%
copy from
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
copy to
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoaderFactory.java
index d320c79711a..996294079c1 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoaderFactory.java
@@ -14,24 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.connect.runtime.rest.entities;
+package org.apache.kafka.connect.runtime.isolation;
-import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
-import org.junit.Test;
+import java.net.URL;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+/**
+ * Factory for {@link PluginClassLoader} instances.
+ * Used for mocking classloader initialization in tests.
+ */
+public interface PluginClassLoaderFactory {
-public class PluginInfoTest {
+ PluginClassLoader newPluginClassLoader(URL pluginLocation, URL[] urls,
ClassLoader parent);
- @Test
- public void testNoVersionFilter() {
- PluginInfo.NoVersionFilter filter = new PluginInfo.NoVersionFilter();
- // We intentionally refrain from using assertEquals and assertNotEquals
- // here to ensure that the filter's equals() method is used
- assertFalse(filter.equals("1.0"));
- assertFalse(filter.equals(new Object()));
- assertFalse(filter.equals(null));
- assertTrue(filter.equals(DelegatingClassLoader.UNDEFINED_VERSION));
- }
-}
+}
\ No newline at end of file
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
index c2829c60273..e001f104ee3 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
@@ -22,6 +22,7 @@ import
org.apache.maven.artifact.versioning.DefaultArtifactVersion;
import java.util.Objects;
public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
+ public static final String UNDEFINED_VERSION = "undefined";
private final Class<? extends T> klass;
private final String name;
private final String version;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
new file mode 100644
index 00000000000..5859af32d47
--- /dev/null
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.components.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Superclass for plugin discovery implementations.
+ *
+ * <p>Callers of this class should use {@link #discoverPlugins(Set)} to
discover plugins which are present in the
+ * passed-in {@link PluginSource} instances.
+ *
+ * <p>Implementors of this class should implement {@link
#scanPlugins(PluginSource)}, in order to scan a single source.
+ * The returned {@link PluginScanResult} should contain only plugins which are
loadable from the passed-in source.
+ * The superclass has some common functionality which is usable in subclasses,
and handles merging multiple results.
+ *
+ * <p>Implementations of this class must be thread-safe, but may have side
effects on the provided {@link ClassLoader}
+ * instances and plugin classes which may not be thread safe. This depends on
the thread safety of the plugin
+ * implementations, due to the necessity of initializing and instantiate
plugin classes to evaluate their versions.
+ */
+public abstract class PluginScanner {
+
+ private static final Logger log =
LoggerFactory.getLogger(PluginScanner.class);
+
+ /**
+ * Entry point for plugin scanning. Discovers plugins present in any of
the provided plugin sources.
+ * <p>See the implementation-specific documentation for the conditions for
a plugin to appear in this result.
+ * @param sources to scan for contained plugins
+ * @return A {@link PluginScanResult} containing all plugins which this
scanning implementation could discover.
+ */
+ public PluginScanResult discoverPlugins(Set<PluginSource> sources) {
+ long startMs = System.currentTimeMillis();
+ List<PluginScanResult> results = new ArrayList<>();
+ for (PluginSource source : sources) {
+ results.add(scanUrlsAndAddPlugins(source));
+ }
+ long endMs = System.currentTimeMillis();
+ log.info("Scanning plugins with {} took {} ms",
getClass().getSimpleName(), endMs - startMs);
+ return new PluginScanResult(results);
+ }
+
+ private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) {
+ log.info("Loading plugin from: {}", source.location());
+ if (log.isDebugEnabled()) {
+ log.debug("Loading plugin urls: {}",
Arrays.toString(source.urls()));
+ }
+ PluginScanResult plugins = scanPlugins(source);
+ log.info("Registered loader: {}", source.loader());
+ loadJdbcDrivers(source.loader());
+ return plugins;
+ }
+
+ /**
+ * Implementation-specific strategy for scanning a single {@link
PluginSource}.
+ * @param source A single source to scan for plugins.
+ * @return A {@link PluginScanResult} containing all plugins which this
scanning implementation could discover.
+ */
+ protected abstract PluginScanResult scanPlugins(PluginSource source);
+
+ private void loadJdbcDrivers(final ClassLoader loader) {
+ // Apply here what java.sql.DriverManager does to discover and
register classes
+ // implementing the java.sql.Driver interface.
+ AccessController.doPrivileged(
+ (PrivilegedAction<Void>) () -> {
+ ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
+ Driver.class,
+ loader
+ );
+ Iterator<Driver> driversIterator = loadedDrivers.iterator();
+ try {
+ while (driversIterator.hasNext()) {
+ Driver driver = driversIterator.next();
+ log.debug(
+ "Registered java.sql.Driver: {} to
java.sql.DriverManager",
+ driver
+ );
+ }
+ } catch (Throwable t) {
+ log.debug(
+ "Ignoring java.sql.Driver classes listed in
resources but not"
+ + " present in class loader's classpath: ",
+ t
+ );
+ }
+ return null;
+ }
+ );
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String
version, ClassLoader loader) {
+ return new PluginDesc(plugin, version, loader);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T>
klass, ClassLoader loader) {
+ SortedSet<PluginDesc<T>> result = new TreeSet<>();
+ ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
+ for (Iterator<T> iterator = serviceLoader.iterator();
iterator.hasNext(); ) {
+ try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+ T pluginImpl;
+ try {
+ pluginImpl = iterator.next();
+ } catch (ServiceConfigurationError t) {
+ log.error("Failed to discover {}{}",
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+ continue;
+ }
+ Class<? extends T> pluginKlass = (Class<? extends T>)
pluginImpl.getClass();
+ if (pluginKlass.getClassLoader() != loader) {
+ log.debug("{} from other classloader {} is visible from
{}, excluding to prevent isolated loading",
+ pluginKlass.getSimpleName(),
pluginKlass.getClassLoader(), loader);
+ continue;
+ }
+ result.add(pluginDesc(pluginKlass, versionFor(pluginImpl),
loader));
+ }
+ }
+ return result;
+ }
+
+ protected static <T> String versionFor(T pluginImpl) {
+ try {
+ if (pluginImpl instanceof Versioned) {
+ return ((Versioned) pluginImpl).version();
+ }
+ } catch (Throwable t) {
+ log.error("Failed to get plugin version for " +
pluginImpl.getClass(), t);
+ }
+ return PluginDesc.UNDEFINED_VERSION;
+ }
+
+ protected static String reflectiveErrorDescription(Throwable t) {
+ if (t instanceof NoSuchMethodException) {
+ return ": Plugin class must have a no-args constructor, and cannot
be a non-static inner class";
+ } else if (t instanceof SecurityException) {
+ return ": Security settings must allow reflective instantiation of
plugin classes";
+ } else if (t instanceof IllegalAccessException) {
+ return ": Plugin class default constructor must be public";
+ } else if (t instanceof ExceptionInInitializerError) {
+ return ": Failed to statically initialize plugin class";
+ } else if (t instanceof InvocationTargetException) {
+ return ": Failed to invoke plugin constructor";
+ } else {
+ return "";
+ }
+ }
+
+ protected LoaderSwap withClassLoader(ClassLoader loader) {
+ ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
+ try {
+ return new LoaderSwap(savedLoader);
+ } catch (Throwable t) {
+ Plugins.compareAndSwapLoaders(savedLoader);
+ throw t;
+ }
+ }
+}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java
new file mode 100644
index 00000000000..ffedd98d6c3
--- /dev/null
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class PluginSource {
+
+ private final Path location;
+ private final ClassLoader loader;
+ private final URL[] urls;
+
+ public PluginSource(Path location, ClassLoader loader, URL[] urls) {
+ this.location = location;
+ this.loader = loader;
+ this.urls = urls;
+ }
+
+ public Path location() {
+ return location;
+ }
+
+ public ClassLoader loader() {
+ return loader;
+ }
+
+ public URL[] urls() {
+ return urls;
+ }
+
+ public boolean isolated() {
+ return loader instanceof PluginClassLoader;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ PluginSource that = (PluginSource) o;
+ return Objects.equals(location, that.location) &&
loader.equals(that.loader) && Arrays.equals(urls, that.urls);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(location, loader);
+ result = 31 * result + Arrays.hashCode(urls);
+ return result;
+ }
+}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index 3592e0ec96c..e1ef76ebd2e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -16,11 +16,14 @@
*/
package org.apache.kafka.connect.runtime.isolation;
+import org.reflections.util.ClasspathHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Modifier;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
@@ -325,6 +328,34 @@ public class PluginUtils {
return Arrays.asList(archives.toArray(new Path[0]));
}
+ public static Set<PluginSource> pluginSources(List<Path> pluginLocations,
ClassLoader classLoader, PluginClassLoaderFactory factory) {
+ Set<PluginSource> pluginSources = new HashSet<>();
+ for (Path pluginLocation : pluginLocations) {
+
+ try {
+ List<URL> pluginUrls = new ArrayList<>();
+ for (Path path : pluginUrls(pluginLocation)) {
+ pluginUrls.add(path.toUri().toURL());
+ }
+ URL[] urls = pluginUrls.toArray(new URL[0]);
+ PluginClassLoader loader = factory.newPluginClassLoader(
+ pluginLocation.toUri().toURL(),
+ urls,
+ classLoader
+ );
+ pluginSources.add(new PluginSource(pluginLocation, loader,
urls));
+ } catch (InvalidPathException | MalformedURLException e) {
+ log.error("Invalid path in plugin path: {}. Ignoring.",
pluginLocation, e);
+ } catch (IOException e) {
+ log.error("Could not get listing for plugin path: {}.
Ignoring.", pluginLocation, e);
+ }
+ }
+ URL[] classpathUrls = ClasspathHelper.forJavaClassPath().toArray(new
URL[0]);
+ pluginSources.add(new PluginSource(null, classLoader.getParent(),
classpathUrls));
+ return pluginSources;
+ }
+
+
/**
* Return the simple class name of a plugin as {@code String}.
*
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index a43f0f226a2..83dec38a6fb 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -38,8 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -59,22 +57,22 @@ public class Plugins {
private final PluginScanResult scanResult;
public Plugins(Map<String, String> props) {
- this(props, Plugins.class.getClassLoader());
+ this(props, Plugins.class.getClassLoader(), new ClassLoaderFactory());
}
// VisibleForTesting
- Plugins(Map<String, String> props, ClassLoader parent) {
+ Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory
factory) {
String pluginPath = WorkerConfig.pluginPath(props);
List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
- delegatingLoader = newDelegatingClassLoader(pluginLocations, parent);
- scanResult = delegatingLoader.initLoaders();
+ delegatingLoader = factory.newDelegatingClassLoader(parent);
+ Set<PluginSource> pluginSources =
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
+ scanResult = initLoaders(pluginSources);
}
- // VisibleForTesting
- protected DelegatingClassLoader newDelegatingClassLoader(final List<Path>
pluginLocations, ClassLoader parent) {
- return AccessController.doPrivileged(
- (PrivilegedAction<DelegatingClassLoader>) () -> new
DelegatingClassLoader(pluginLocations, parent)
- );
+ private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
+ PluginScanResult reflectiveScanResult = new
ReflectionScanner().discoverPlugins(pluginSources);
+ delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
+ return reflectiveScanResult;
}
private static <T> String pluginNames(Collection<PluginDesc<T>> plugins) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
new file mode 100644
index 00000000000..9289162b638
--- /dev/null
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.provider.ConfigProvider;
+import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+import org.reflections.Configuration;
+import org.reflections.Reflections;
+import org.reflections.ReflectionsException;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * A {@link PluginScanner} implementation which uses reflection and {@link
ServiceLoader} to discover plugins.
+ * <p>This implements the legacy discovery strategy, which uses a combination
of reflection and service loading in
+ * order to discover plugins. Specifically, a plugin appears in the scan
result if all the following conditions are true:
+ * <ul>
+ * <li>The class is concrete</li>
+ * <li>The class is public</li>
+ * <li>The class has a no-args constructor</li>
+ * <li>The no-args constructor is public</li>
+ * <li>Static initialization of the class completes without throwing an
exception</li>
+ * <li>The no-args constructor completes without throwing an exception</li>
+ * <li>One of the following is true:
+ * <ul>
+ * <li>Is a subclass of {@link SinkConnector}, {@link
SourceConnector}, {@link Converter},
+ * {@link HeaderConverter}, {@link Transformation}, or {@link
Predicate}</li>
+ * <li>Is a subclass of {@link ConfigProvider}, {@link
ConnectRestExtension}, or
+ * {@link ConnectorClientConfigOverridePolicy}, and has a {@link
ServiceLoader} compatible
+ * manifest file or module declaration</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * <p>Note: This scanner has a runtime proportional to the number of overall
classes in the passed-in
+ * {@link PluginSource} objects, which may be significant for plugins with
large dependencies.
+ */
+public class ReflectionScanner extends PluginScanner {
+
+ private static final Logger log =
LoggerFactory.getLogger(ReflectionScanner.class);
+
+ public static <T> String versionFor(Class<? extends T> pluginKlass) throws
ReflectiveOperationException {
+ T pluginImpl = pluginKlass.getDeclaredConstructor().newInstance();
+ return versionFor(pluginImpl);
+ }
+
+ @Override
+ protected PluginScanResult scanPlugins(PluginSource source) {
+ ClassLoader loader = source.loader();
+ ConfigurationBuilder builder = new ConfigurationBuilder();
+ builder.setClassLoaders(new ClassLoader[]{loader});
+ builder.addUrls(source.urls());
+ builder.setScanners(new SubTypesScanner());
+ builder.useParallelExecutor();
+ Reflections reflections = new InternalReflections(builder);
+
+ return new PluginScanResult(
+ getPluginDesc(reflections, SinkConnector.class, loader),
+ getPluginDesc(reflections, SourceConnector.class, loader),
+ getPluginDesc(reflections, Converter.class, loader),
+ getPluginDesc(reflections, HeaderConverter.class, loader),
+ getTransformationPluginDesc(loader, reflections),
+ getPredicatePluginDesc(loader, reflections),
+ getServiceLoaderPluginDesc(ConfigProvider.class, loader),
+ getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
+
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
+ );
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private SortedSet<PluginDesc<Predicate<?>>>
getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
+ return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>)
getPluginDesc(reflections, Predicate.class, loader);
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private SortedSet<PluginDesc<Transformation<?>>>
getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
+ return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>)
getPluginDesc(reflections, Transformation.class, loader);
+ }
+
+ private <T> SortedSet<PluginDesc<T>> getPluginDesc(
+ Reflections reflections,
+ Class<T> klass,
+ ClassLoader loader
+ ) {
+ Set<Class<? extends T>> plugins;
+ try {
+ plugins = reflections.getSubTypesOf(klass);
+ } catch (ReflectionsException e) {
+ log.debug("Reflections scanner could not find any classes for
URLs: " +
+ reflections.getConfiguration().getUrls(), e);
+ return Collections.emptySortedSet();
+ }
+
+ SortedSet<PluginDesc<T>> result = new TreeSet<>();
+ for (Class<? extends T> pluginKlass : plugins) {
+ if (!PluginUtils.isConcrete(pluginKlass)) {
+ log.debug("Skipping {} as it is not concrete implementation",
pluginKlass);
+ continue;
+ }
+ if (pluginKlass.getClassLoader() != loader) {
+ log.debug("{} from other classloader {} is visible from {},
excluding to prevent isolated loading",
+ pluginKlass.getSimpleName(),
pluginKlass.getClassLoader(), loader);
+ continue;
+ }
+ try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+ result.add(pluginDesc(pluginKlass, versionFor(pluginKlass),
loader));
+ } catch (ReflectiveOperationException | LinkageError e) {
+ log.error("Failed to discover {}: Unable to instantiate {}{}",
klass.getSimpleName(), pluginKlass.getSimpleName(),
reflectiveErrorDescription(e), e);
+ }
+ }
+ return result;
+ }
+
+ private static class InternalReflections extends Reflections {
+
+ public InternalReflections(Configuration configuration) {
+ super(configuration);
+ }
+
+ // When Reflections is used for parallel scans, it has a bug where it
propagates ReflectionsException
+ // as RuntimeException. Override the scan behavior to emulate the
singled-threaded logic.
+ @Override
+ protected void scan(URL url) {
+ try {
+ super.scan(url);
+ } catch (ReflectionsException e) {
+ Logger log = Reflections.log;
+ if (log != null && log.isWarnEnabled()) {
+ log.warn("could not create Vfs.Dir from url. ignoring the
exception and continuing", e);
+ }
+ }
+ }
+ }
+}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfo.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfo.java
index e7032678676..fdefe22ec59 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfo.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfo.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType;
@@ -91,7 +90,7 @@ public class PluginInfo {
public static final class NoVersionFilter {
// This method is used by Jackson to filter the version field for
plugins that don't have a version
public boolean equals(Object obj) {
- return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj);
+ return PluginDesc.UNDEFINED_VERSION.equals(obj);
}
// Dummy hashCode method to not fail compilation because of equals()
method
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
index 564969c6090..4f683e3b355 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
@@ -14,112 +14,101 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.connect.runtime.isolation;
-import org.junit.Rule;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.junit.Before;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import java.nio.file.Files;
-import java.nio.file.Path;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.Collections;
+import java.util.SortedSet;
+import java.util.TreeSet;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class DelegatingClassLoaderTest {
- @Rule
- public TemporaryFolder pluginDir = new TemporaryFolder();
-
- @Test
- public void testLoadingUnloadedPluginClass() {
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
- Collections.emptyList(),
- DelegatingClassLoader.class.getClassLoader()
- );
- classLoader.initLoaders();
- for (String pluginClassName : TestPlugins.pluginClasses()) {
- assertThrows(ClassNotFoundException.class, () ->
classLoader.loadClass(pluginClassName));
+ public PluginClassLoader parent;
+ public PluginClassLoader pluginLoader;
+ public DelegatingClassLoader classLoader;
+ public PluginDesc<SinkConnector> pluginDesc;
+ public PluginScanResult scanResult;
+
+ // Arbitrary values, their contents is not meaningful.
+ public static final String ARBITRARY = "arbitrary";
+ public static final Class<?> ARBITRARY_CLASS = org.mockito.Mockito.class;
+ public static final URL ARBITRARY_URL;
+
+ static {
+ try {
+ ARBITRARY_URL = new URL("jar:file://" + ARBITRARY + "!/" +
ARBITRARY);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
}
}
- @Test
- public void testLoadingPluginClass() throws ClassNotFoundException {
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
- TestPlugins.pluginPath(),
- DelegatingClassLoader.class.getClassLoader()
+ @Before
+ @SuppressWarnings({"unchecked"})
+ public void setUp() {
+ parent = mock(PluginClassLoader.class);
+ pluginLoader = mock(PluginClassLoader.class);
+ classLoader = new DelegatingClassLoader(parent);
+ SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
+ // Lie to the DCL that this arbitrary class is a connector, since all
real connector classes we have access to
+ // are forced to be non-isolated by PluginUtils.shouldLoadInIsolation.
+ pluginDesc = new PluginDesc<>((Class<? extends SinkConnector>)
ARBITRARY_CLASS, null, pluginLoader);
+ assertTrue(PluginUtils.shouldLoadInIsolation(pluginDesc.className()));
+ sinkConnectors.add(pluginDesc);
+ scanResult = new PluginScanResult(
+ sinkConnectors,
+ Collections.emptySortedSet(),
+ Collections.emptySortedSet(),
+ Collections.emptySortedSet(),
+ Collections.emptySortedSet(),
+ Collections.emptySortedSet(),
+ Collections.emptySortedSet(),
+ Collections.emptySortedSet(),
+ Collections.emptySortedSet()
);
- classLoader.initLoaders();
- for (String pluginClassName : TestPlugins.pluginClasses()) {
- assertNotNull(classLoader.loadClass(pluginClassName));
- assertNotNull(classLoader.pluginClassLoader(pluginClassName));
- }
}
@Test
- public void testLoadingInvalidUberJar() throws Exception {
- pluginDir.newFile("invalid.jar");
-
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
-
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
- DelegatingClassLoader.class.getClassLoader()
- );
- classLoader.initLoaders();
+ public void testEmptyConnectorLoader() {
+ assertSame(classLoader, classLoader.connectorLoader(ARBITRARY));
}
@Test
- public void testLoadingPluginDirContainsInvalidJarsOnly() throws Exception
{
- pluginDir.newFolder("my-plugin");
- pluginDir.newFile("my-plugin/invalid.jar");
-
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
-
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
- DelegatingClassLoader.class.getClassLoader()
- );
- classLoader.initLoaders();
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void testEmptyLoadClass() throws ClassNotFoundException {
+ when(parent.loadClass(ARBITRARY, false)).thenReturn((Class)
ARBITRARY_CLASS);
+ assertSame(ARBITRARY_CLASS, classLoader.loadClass(ARBITRARY, false));
}
@Test
- public void testLoadingNoPlugins() {
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
-
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
- DelegatingClassLoader.class.getClassLoader()
- );
- classLoader.initLoaders();
+ public void testEmptyGetResource() {
+ when(parent.getResource(ARBITRARY)).thenReturn(ARBITRARY_URL);
+ assertSame(ARBITRARY_URL, classLoader.getResource(ARBITRARY));
}
@Test
- public void testLoadingPluginDirEmpty() throws Exception {
- pluginDir.newFolder("my-plugin");
-
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
-
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
- DelegatingClassLoader.class.getClassLoader()
- );
- classLoader.initLoaders();
+ public void testInitializedConnectorLoader() {
+ classLoader.installDiscoveredPlugins(scanResult);
+ assertSame(pluginLoader,
classLoader.connectorLoader(PluginUtils.prunedName(pluginDesc)));
+ assertSame(pluginLoader,
classLoader.connectorLoader(PluginUtils.simpleName(pluginDesc)));
+ assertSame(pluginLoader,
classLoader.connectorLoader(pluginDesc.className()));
}
@Test
- public void testLoadingMixOfValidAndInvalidPlugins() throws Exception {
- pluginDir.newFile("invalid.jar");
- pluginDir.newFolder("my-plugin");
- pluginDir.newFile("my-plugin/invalid.jar");
- Path pluginPath = this.pluginDir.getRoot().toPath();
-
- for (Path source : TestPlugins.pluginPath()) {
- Files.copy(source, pluginPath.resolve(source.getFileName()));
- }
-
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
-
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
- DelegatingClassLoader.class.getClassLoader()
- );
- classLoader.initLoaders();
- for (String pluginClassName : TestPlugins.pluginClasses()) {
- assertNotNull(classLoader.loadClass(pluginClassName));
- assertNotNull(classLoader.pluginClassLoader(pluginClassName));
- }
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void testInitializedLoadClass() throws ClassNotFoundException {
+ classLoader.installDiscoveredPlugins(scanResult);
+ String className = pluginDesc.className();
+ when(pluginLoader.loadClass(className, false)).thenReturn((Class)
ARBITRARY_CLASS);
+ assertSame(ARBITRARY_CLASS, classLoader.loadClass(className, false));
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
similarity index 69%
copy from
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
copy to
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
index 564969c6090..28e3a4a56f6 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
@@ -24,22 +24,22 @@ import org.junit.rules.TemporaryFolder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
+import java.util.List;
+import java.util.Set;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
-public class DelegatingClassLoaderTest {
+public class PluginScannerTest {
@Rule
public TemporaryFolder pluginDir = new TemporaryFolder();
@Test
public void testLoadingUnloadedPluginClass() {
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
- Collections.emptyList(),
- DelegatingClassLoader.class.getClassLoader()
+ DelegatingClassLoader classLoader = initClassLoader(
+ Collections.emptyList()
);
- classLoader.initLoaders();
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertThrows(ClassNotFoundException.class, () ->
classLoader.loadClass(pluginClassName));
}
@@ -47,11 +47,9 @@ public class DelegatingClassLoaderTest {
@Test
public void testLoadingPluginClass() throws ClassNotFoundException {
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
- TestPlugins.pluginPath(),
- DelegatingClassLoader.class.getClassLoader()
+ DelegatingClassLoader classLoader = initClassLoader(
+ TestPlugins.pluginPath()
);
- classLoader.initLoaders();
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertNotNull(classLoader.loadClass(pluginClassName));
assertNotNull(classLoader.pluginClassLoader(pluginClassName));
@@ -62,11 +60,9 @@ public class DelegatingClassLoaderTest {
public void testLoadingInvalidUberJar() throws Exception {
pluginDir.newFile("invalid.jar");
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
-
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
- DelegatingClassLoader.class.getClassLoader()
+ initClassLoader(
+
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
- classLoader.initLoaders();
}
@Test
@@ -74,31 +70,25 @@ public class DelegatingClassLoaderTest {
pluginDir.newFolder("my-plugin");
pluginDir.newFile("my-plugin/invalid.jar");
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
-
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
- DelegatingClassLoader.class.getClassLoader()
+ initClassLoader(
+
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
- classLoader.initLoaders();
}
@Test
public void testLoadingNoPlugins() {
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
-
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
- DelegatingClassLoader.class.getClassLoader()
+ initClassLoader(
+
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
- classLoader.initLoaders();
}
@Test
public void testLoadingPluginDirEmpty() throws Exception {
pluginDir.newFolder("my-plugin");
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
-
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
- DelegatingClassLoader.class.getClassLoader()
+ initClassLoader(
+
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
- classLoader.initLoaders();
}
@Test
@@ -112,14 +102,22 @@ public class DelegatingClassLoaderTest {
Files.copy(source, pluginPath.resolve(source.getFileName()));
}
- DelegatingClassLoader classLoader = new DelegatingClassLoader(
-
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
- DelegatingClassLoader.class.getClassLoader()
+ DelegatingClassLoader classLoader = initClassLoader(
+
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
- classLoader.initLoaders();
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertNotNull(classLoader.loadClass(pluginClassName));
assertNotNull(classLoader.pluginClassLoader(pluginClassName));
}
}
+
+ private DelegatingClassLoader initClassLoader(List<Path> pluginLocations) {
+ ClassLoaderFactory factory = new ClassLoaderFactory();
+ DelegatingClassLoader classLoader =
factory.newDelegatingClassLoader(DelegatingClassLoader.class.getClassLoader());
+ Set<PluginSource> pluginSources =
PluginUtils.pluginSources(pluginLocations, classLoader, factory);
+ PluginScanResult scanResult = new
ReflectionScanner().discoverPlugins(pluginSources);
+ classLoader.installDiscoveredPlugins(scanResult);
+ return classLoader;
+ }
+
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index e185a330d21..bb6d399c122 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -498,7 +498,7 @@ public class PluginsTest {
WorkerConfig.PLUGIN_PATH_CONFIG,
TestPlugins.pluginPathJoined(childResource)
);
- plugins = new Plugins(pluginProps, parent);
+ plugins = new Plugins(pluginProps, parent, new ClassLoaderFactory());
Converter converter = plugins.newPlugin(
className,
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index 018d8449b73..c7b884c3605 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -24,12 +24,10 @@ import java.lang.management.ManagementFactory;
import java.lang.management.MonitorInfo;
import java.lang.management.ThreadInfo;
import java.net.URL;
-import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BrokenBarrierException;
@@ -79,15 +77,7 @@ public class SynchronizationTest {
+ "." + testName.getMethodName() + "-";
dclBreakpoint = new Breakpoint<>();
pclBreakpoint = new Breakpoint<>();
- plugins = new Plugins(pluginProps) {
- @Override
- protected DelegatingClassLoader
newDelegatingClassLoader(List<Path> pluginLocations, ClassLoader parent) {
- return AccessController.doPrivileged(
- (PrivilegedAction<DelegatingClassLoader>) () ->
- new SynchronizedDelegatingClassLoader(pluginLocations,
parent)
- );
- }
- };
+ plugins = new Plugins(pluginProps, Plugins.class.getClassLoader(), new
SynchronizedClassLoaderFactory());
exec = new ThreadPoolExecutor(
2,
2,
@@ -167,26 +157,36 @@ public class SynchronizationTest {
}
}
- private class SynchronizedDelegatingClassLoader extends
DelegatingClassLoader {
- {
- ClassLoader.registerAsParallelCapable();
- }
-
- public SynchronizedDelegatingClassLoader(List<Path> pluginLocations,
ClassLoader parent) {
- super(pluginLocations, parent);
+ private class SynchronizedClassLoaderFactory extends ClassLoaderFactory {
+ @Override
+ public DelegatingClassLoader newDelegatingClassLoader(ClassLoader
parent) {
+ return AccessController.doPrivileged(
+ (PrivilegedAction<DelegatingClassLoader>) () ->
+ new SynchronizedDelegatingClassLoader(parent)
+ );
}
@Override
- protected PluginClassLoader newPluginClassLoader(
- URL pluginLocation,
- URL[] urls,
- ClassLoader parent
+ public PluginClassLoader newPluginClassLoader(
+ URL pluginLocation,
+ URL[] urls,
+ ClassLoader parent
) {
return AccessController.doPrivileged(
- (PrivilegedAction<PluginClassLoader>) () ->
- new SynchronizedPluginClassLoader(pluginLocation, urls,
parent)
+ (PrivilegedAction<PluginClassLoader>) () ->
+ new SynchronizedPluginClassLoader(pluginLocation,
urls, parent)
);
}
+ }
+
+ private class SynchronizedDelegatingClassLoader extends
DelegatingClassLoader {
+ {
+ ClassLoader.registerAsParallelCapable();
+ }
+
+ public SynchronizedDelegatingClassLoader(ClassLoader parent) {
+ super(parent);
+ }
@Override
public PluginClassLoader pluginClassLoader(String name) {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
index d320c79711a..34e28ee47f6 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.connect.runtime.rest.entities;
-import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.junit.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -32,6 +32,6 @@ public class PluginInfoTest {
assertFalse(filter.equals("1.0"));
assertFalse(filter.equals(new Object()));
assertFalse(filter.equals(null));
- assertTrue(filter.equals(DelegatingClassLoader.UNDEFINED_VERSION));
+ assertTrue(filter.equals(PluginDesc.UNDEFINED_VERSION));
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 1072373579b..57149ca498d 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -33,11 +33,11 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.SampleSinkConnector;
import org.apache.kafka.connect.runtime.SampleSourceConnector;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
-import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@@ -453,7 +453,7 @@ public class ConnectorPluginsResourceTest {
public MockConnectorPluginDesc(Class<T> klass) throws Exception {
super(
klass,
- DelegatingClassLoader.versionFor(klass),
+ ReflectionScanner.versionFor(klass),
new MockPluginClassLoader(null, new URL[0])
);
}