This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1b925e9ee75 KAFKA-15069: Refactor plugin scanning logic into 
ReflectionScanner (#13821)
1b925e9ee75 is described below

commit 1b925e9ee753390d4633e8e5bd25a00af8d26cb9
Author: Greg Harris <[email protected]>
AuthorDate: Thu Jul 6 10:22:28 2023 -0700

    KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner (#13821)
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../runtime/isolation/ClassLoaderFactory.java      |  41 +++
 .../runtime/isolation/DelegatingClassLoader.java   | 299 +--------------------
 .../isolation/PluginClassLoaderFactory.java}       |  26 +-
 .../connect/runtime/isolation/PluginDesc.java      |   1 +
 .../connect/runtime/isolation/PluginScanner.java   | 186 +++++++++++++
 .../connect/runtime/isolation/PluginSource.java    |  66 +++++
 .../connect/runtime/isolation/PluginUtils.java     |  31 +++
 .../kafka/connect/runtime/isolation/Plugins.java   |  20 +-
 .../runtime/isolation/ReflectionScanner.java       | 163 +++++++++++
 .../connect/runtime/rest/entities/PluginInfo.java  |   3 +-
 .../isolation/DelegatingClassLoaderTest.java       | 147 +++++-----
 ...ClassLoaderTest.java => PluginScannerTest.java} |  56 ++--
 .../connect/runtime/isolation/PluginsTest.java     |   2 +-
 .../runtime/isolation/SynchronizationTest.java     |  48 ++--
 .../runtime/rest/entities/PluginInfoTest.java      |   4 +-
 .../resources/ConnectorPluginsResourceTest.java    |   4 +-
 16 files changed, 635 insertions(+), 462 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
new file mode 100644
index 00000000000..9de24f796d1
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Factory for {@link DelegatingClassLoader} and {@link PluginClassLoader} 
instances.
+ * Used for mocking classloader initialization in tests.
+ */
+public class ClassLoaderFactory implements PluginClassLoaderFactory {
+
+    public DelegatingClassLoader newDelegatingClassLoader(ClassLoader parent) {
+        return AccessController.doPrivileged(
+                (PrivilegedAction<DelegatingClassLoader>) () -> new 
DelegatingClassLoader(parent)
+        );
+    }
+
+    public PluginClassLoader newPluginClassLoader(URL pluginLocation, URL[] 
urls, ClassLoader parent) {
+        return AccessController.doPrivileged(
+                (PrivilegedAction<PluginClassLoader>) () -> new 
PluginClassLoader(pluginLocation, urls, parent)
+        );
+    }
+
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index cf0a5a1ba7f..dd342c67a43 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -16,49 +16,15 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
-import org.apache.kafka.common.config.provider.ConfigProvider;
-import org.apache.kafka.connect.components.Versioned;
-import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
-import org.apache.kafka.connect.rest.ConnectRestExtension;
-import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.kafka.connect.source.SourceConnector;
-import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.storage.HeaderConverter;
-import org.apache.kafka.connect.transforms.Transformation;
-import org.apache.kafka.connect.transforms.predicates.Predicate;
-import org.reflections.Configuration;
-import org.reflections.Reflections;
-import org.reflections.ReflectionsException;
-import org.reflections.scanners.SubTypesScanner;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.nio.file.InvalidPathException;
-import java.nio.file.Path;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.sql.Driver;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.ServiceConfigurationError;
-import java.util.ServiceLoader;
-import java.util.Set;
 import java.util.SortedMap;
-import java.util.SortedSet;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -75,11 +41,9 @@ import java.util.concurrent.ConcurrentMap;
  */
 public class DelegatingClassLoader extends URLClassLoader {
     private static final Logger log = 
LoggerFactory.getLogger(DelegatingClassLoader.class);
-    public static final String UNDEFINED_VERSION = "undefined";
 
     private final ConcurrentMap<String, SortedMap<PluginDesc<?>, ClassLoader>> 
pluginLoaders;
     private final ConcurrentMap<String, String> aliases;
-    private final List<Path> pluginLocations;
 
     // Although this classloader does not load classes directly but rather 
delegates loading to a
     // PluginClassLoader or its parent through its base class, because of the 
use of inheritance in
@@ -89,19 +53,18 @@ public class DelegatingClassLoader extends URLClassLoader {
         ClassLoader.registerAsParallelCapable();
     }
 
-    public DelegatingClassLoader(List<Path> pluginLocations, ClassLoader 
parent) {
+    public DelegatingClassLoader(ClassLoader parent) {
         super(new URL[0], parent);
-        this.pluginLocations = pluginLocations;
         this.pluginLoaders = new ConcurrentHashMap<>();
         this.aliases = new ConcurrentHashMap<>();
     }
 
-    public DelegatingClassLoader(List<Path> pluginLocations) {
+    public DelegatingClassLoader() {
         // Use as parent the classloader that loaded this class. In most cases 
this will be the
         // System classloader. But this choice here provides additional 
flexibility in managed
         // environments that control classloading differently (OSGi, Spring 
and others) and don't
         // depend on the System classloader to load Connect's classes.
-        this(pluginLocations, DelegatingClassLoader.class.getClassLoader());
+        this(DelegatingClassLoader.class.getClassLoader());
     }
 
     /**
@@ -136,240 +99,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         return classLoader;
     }
 
-    // VisibleForTesting
-    PluginClassLoader newPluginClassLoader(
-            final URL pluginLocation,
-            final URL[] urls,
-            final ClassLoader parent
-    ) {
-        return AccessController.doPrivileged(
-                (PrivilegedAction<PluginClassLoader>) () -> new 
PluginClassLoader(pluginLocation, urls, parent)
-        );
-    }
-
-    public PluginScanResult initLoaders() {
-        List<PluginScanResult> results = new ArrayList<>();
-        for (Path pluginLocation : pluginLocations) {
-            try {
-                results.add(registerPlugin(pluginLocation));
-            } catch (InvalidPathException | MalformedURLException e) {
-                log.error("Invalid path in plugin path: {}. Ignoring.", 
pluginLocation, e);
-            } catch (IOException e) {
-                log.error("Could not get listing for plugin path: {}. 
Ignoring.", pluginLocation, e);
-            }
-        }
-        // Finally add parent/system loader.
-        results.add(scanUrlsAndAddPlugins(
-                getParent(),
-                ClasspathHelper.forJavaClassPath().toArray(new URL[0])
-        ));
-        PluginScanResult scanResult = new PluginScanResult(results);
-        installDiscoveredPlugins(scanResult);
-        return scanResult;
-    }
-
-    private PluginScanResult registerPlugin(Path pluginLocation)
-        throws IOException {
-        log.info("Loading plugin from: {}", pluginLocation);
-        List<URL> pluginUrls = new ArrayList<>();
-        for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
-            pluginUrls.add(path.toUri().toURL());
-        }
-        URL[] urls = pluginUrls.toArray(new URL[0]);
-        if (log.isDebugEnabled()) {
-            log.debug("Loading plugin urls: {}", Arrays.toString(urls));
-        }
-        PluginClassLoader loader = newPluginClassLoader(
-                pluginLocation.toUri().toURL(),
-                urls,
-                this
-        );
-        return scanUrlsAndAddPlugins(loader, urls);
-    }
-
-    private PluginScanResult scanUrlsAndAddPlugins(
-            ClassLoader loader,
-            URL[] urls
-    ) {
-        PluginScanResult plugins = scanPluginPath(loader, urls);
-        log.info("Registered loader: {}", loader);
-        loadJdbcDrivers(loader);
-        return plugins;
-    }
-
-    private void loadJdbcDrivers(final ClassLoader loader) {
-        // Apply here what java.sql.DriverManager does to discover and 
register classes
-        // implementing the java.sql.Driver interface.
-        AccessController.doPrivileged(
-            (PrivilegedAction<Void>) () -> {
-                ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
-                        Driver.class,
-                        loader
-                );
-                Iterator<Driver> driversIterator = loadedDrivers.iterator();
-                try {
-                    while (driversIterator.hasNext()) {
-                        Driver driver = driversIterator.next();
-                        log.debug(
-                                "Registered java.sql.Driver: {} to 
java.sql.DriverManager",
-                                driver
-                        );
-                    }
-                } catch (Throwable t) {
-                    log.debug(
-                            "Ignoring java.sql.Driver classes listed in 
resources but not"
-                                    + " present in class loader's classpath: ",
-                            t
-                    );
-                }
-                return null;
-            }
-        );
-    }
-
-    private PluginScanResult scanPluginPath(
-            ClassLoader loader,
-            URL[] urls
-    ) {
-        ConfigurationBuilder builder = new ConfigurationBuilder();
-        builder.setClassLoaders(new ClassLoader[]{loader});
-        builder.addUrls(urls);
-        builder.setScanners(new SubTypesScanner());
-        builder.useParallelExecutor();
-        Reflections reflections = new InternalReflections(builder);
-
-        return new PluginScanResult(
-                getPluginDesc(reflections, SinkConnector.class, loader),
-                getPluginDesc(reflections, SourceConnector.class, loader),
-                getPluginDesc(reflections, Converter.class, loader),
-                getPluginDesc(reflections, HeaderConverter.class, loader),
-                getTransformationPluginDesc(loader, reflections),
-                getPredicatePluginDesc(loader, reflections),
-                getServiceLoaderPluginDesc(ConfigProvider.class, loader),
-                getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
-                
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
-        );
-    }
-
-    @SuppressWarnings({"unchecked"})
-    private SortedSet<PluginDesc<Predicate<?>>> 
getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
-        return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, Predicate.class, loader);
-    }
-
-    @SuppressWarnings({"unchecked"})
-    private SortedSet<PluginDesc<Transformation<?>>> 
getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
-        return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, Transformation.class, loader);
-    }
-
-    private <T> SortedSet<PluginDesc<T>> getPluginDesc(
-            Reflections reflections,
-            Class<T> klass,
-            ClassLoader loader
-    ) {
-        Set<Class<? extends T>> plugins;
-        try {
-            plugins = reflections.getSubTypesOf(klass);
-        } catch (ReflectionsException e) {
-            log.debug("Reflections scanner could not find any classes for 
URLs: " +
-                    reflections.getConfiguration().getUrls(), e);
-            return Collections.emptySortedSet();
-        }
-
-        SortedSet<PluginDesc<T>> result = new TreeSet<>();
-        for (Class<? extends T> pluginKlass : plugins) {
-            if (!PluginUtils.isConcrete(pluginKlass)) {
-                log.debug("Skipping {} as it is not concrete implementation", 
pluginKlass);
-                continue;
-            }
-            if (pluginKlass.getClassLoader() != loader) {
-                log.debug("{} from other classloader {} is visible from {}, 
excluding to prevent isolated loading",
-                        pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), loader);
-                continue;
-            }
-            try (LoaderSwap loaderSwap = withClassLoader(loader)) {
-                result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), 
loader));
-            } catch (ReflectiveOperationException | LinkageError e) {
-                log.error("Failed to discover {}: Unable to instantiate {}{}", 
klass.getSimpleName(), pluginKlass.getSimpleName(), 
reflectiveErrorDescription(e), e);
-            }
-        }
-        return result;
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, ClassLoader loader) {
-        return new PluginDesc(plugin, version, loader);
-    }
-
-    @SuppressWarnings("unchecked")
-    private <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, ClassLoader loader) {
-        SortedSet<PluginDesc<T>> result = new TreeSet<>();
-        ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
-        for (Iterator<T> iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-            try (LoaderSwap loaderSwap = withClassLoader(loader)) {
-                T pluginImpl;
-                try {
-                    pluginImpl = iterator.next();
-                } catch (ServiceConfigurationError t) {
-                    log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
-                    continue;
-                }
-                Class<? extends T> pluginKlass = (Class<? extends T>) 
pluginImpl.getClass();
-                if (pluginKlass.getClassLoader() != loader) {
-                    log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",
-                            pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), loader);
-                    continue;
-                }
-                result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
loader));
-            }
-        }
-        return result;
-    }
-
-    private static <T>  String versionFor(T pluginImpl) {
-        try {
-            if (pluginImpl instanceof Versioned) {
-                return ((Versioned) pluginImpl).version();
-            }
-        } catch (Throwable t) {
-            log.error("Failed to get plugin version for " + 
pluginImpl.getClass(), t);
-        }
-        return UNDEFINED_VERSION;
-    }
-
-    public static <T> String versionFor(Class<? extends T> pluginKlass) throws 
ReflectiveOperationException {
-        // Unconditionally use the default constructor to create an instance 
to assert that
-        // the constructor exists and can complete successfully.
-        T pluginImpl = pluginKlass.getDeclaredConstructor().newInstance();
-        return versionFor(pluginImpl);
-    }
-
-    private static String reflectiveErrorDescription(Throwable t) {
-        if (t instanceof NoSuchMethodException) {
-            return ": Plugin class must have a no-args constructor, and cannot 
be a non-static inner class";
-        } else if (t instanceof SecurityException) {
-            return ": Security settings must allow reflective instantiation of 
plugin classes";
-        } else if (t instanceof IllegalAccessException) {
-            return ": Plugin class default constructor must be public";
-        } else if (t instanceof ExceptionInInitializerError) {
-            return ": Failed to statically initialize plugin class";
-        } else if (t instanceof InvocationTargetException) {
-            return ": Failed to invoke plugin constructor";
-        } else {
-            return "";
-        }
-    }
-
-    public LoaderSwap withClassLoader(ClassLoader loader) {
-        ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
-        try {
-            return new LoaderSwap(savedLoader);
-        } catch (Throwable t) {
-            Plugins.compareAndSwapLoaders(savedLoader);
-            throw t;
-        }
-    }
-
-    private void installDiscoveredPlugins(PluginScanResult scanResult) {
+    public void installDiscoveredPlugins(PluginScanResult scanResult) {
         pluginLoaders.putAll(computePluginLoaders(scanResult));
         for (String pluginClassName : pluginLoaders.keySet()) {
             log.info("Added plugin '{}'", pluginClassName);
@@ -399,25 +129,4 @@ public class DelegatingClassLoader extends URLClassLoader {
                         .put(pluginDesc, pluginDesc.loader()));
         return pluginLoaders;
     }
-
-    private static class InternalReflections extends Reflections {
-
-        public InternalReflections(Configuration configuration) {
-            super(configuration);
-        }
-
-        // When Reflections is used for parallel scans, it has a bug where it 
propagates ReflectionsException
-        // as RuntimeException.  Override the scan behavior to emulate the 
singled-threaded logic.
-        @Override
-        protected void scan(URL url) {
-            try {
-                super.scan(url);
-            } catch (ReflectionsException e) {
-                Logger log = Reflections.log;
-                if (log != null && log.isWarnEnabled()) {
-                    log.warn("could not create Vfs.Dir from url. ignoring the 
exception and continuing", e);
-                }
-            }
-        }
-    }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoaderFactory.java
similarity index 50%
copy from 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
copy to 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoaderFactory.java
index d320c79711a..996294079c1 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoaderFactory.java
@@ -14,24 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.runtime.rest.entities;
+package org.apache.kafka.connect.runtime.isolation;
 
-import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
-import org.junit.Test;
+import java.net.URL;
 
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+/**
+ * Factory for {@link PluginClassLoader} instances.
+ * Used for mocking classloader initialization in tests.
+ */
+public interface PluginClassLoaderFactory {
 
-public class PluginInfoTest {
+    PluginClassLoader newPluginClassLoader(URL pluginLocation, URL[] urls, 
ClassLoader parent);
 
-    @Test
-    public void testNoVersionFilter() {
-        PluginInfo.NoVersionFilter filter = new PluginInfo.NoVersionFilter();
-        // We intentionally refrain from using assertEquals and assertNotEquals
-        // here to ensure that the filter's equals() method is used
-        assertFalse(filter.equals("1.0"));
-        assertFalse(filter.equals(new Object()));
-        assertFalse(filter.equals(null));
-        assertTrue(filter.equals(DelegatingClassLoader.UNDEFINED_VERSION));
-    }
-}
+}
\ No newline at end of file
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
index c2829c60273..e001f104ee3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
@@ -22,6 +22,7 @@ import 
org.apache.maven.artifact.versioning.DefaultArtifactVersion;
 import java.util.Objects;
 
 public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
+    public static final String UNDEFINED_VERSION = "undefined";
     private final Class<? extends T> klass;
     private final String name;
     private final String version;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
new file mode 100644
index 00000000000..5859af32d47
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.components.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Superclass for plugin discovery implementations.
+ *
+ * <p>Callers of this class should use {@link #discoverPlugins(Set)} to 
discover plugins which are present in the
+ * passed-in {@link PluginSource} instances.
+ *
+ * <p>Implementors of this class should implement {@link 
#scanPlugins(PluginSource)}, in order to scan a single source.
+ * The returned {@link PluginScanResult} should contain only plugins which are 
loadable from the passed-in source.
+ * The superclass has some common functionality which is usable in subclasses, 
and handles merging multiple results.
+ *
+ * <p>Implementations of this class must be thread-safe, but may have side 
effects on the provided {@link ClassLoader}
+ * instances and plugin classes which may not be thread safe. This depends on 
the thread safety of the plugin
+ * implementations, due to the necessity of initializing and instantiate 
plugin classes to evaluate their versions.
+ */
+public abstract class PluginScanner {
+
+    private static final Logger log = 
LoggerFactory.getLogger(PluginScanner.class);
+
+    /**
+     * Entry point for plugin scanning. Discovers plugins present in any of 
the provided plugin sources.
+     * <p>See the implementation-specific documentation for the conditions for 
a plugin to appear in this result.
+     * @param sources to scan for contained plugins
+     * @return A {@link PluginScanResult} containing all plugins which this 
scanning implementation could discover.
+     */
+    public PluginScanResult discoverPlugins(Set<PluginSource> sources) {
+        long startMs = System.currentTimeMillis();
+        List<PluginScanResult> results = new ArrayList<>();
+        for (PluginSource source : sources) {
+            results.add(scanUrlsAndAddPlugins(source));
+        }
+        long endMs = System.currentTimeMillis();
+        log.info("Scanning plugins with {} took {} ms", 
getClass().getSimpleName(), endMs - startMs);
+        return new PluginScanResult(results);
+    }
+
+    private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) {
+        log.info("Loading plugin from: {}", source.location());
+        if (log.isDebugEnabled()) {
+            log.debug("Loading plugin urls: {}", 
Arrays.toString(source.urls()));
+        }
+        PluginScanResult plugins = scanPlugins(source);
+        log.info("Registered loader: {}", source.loader());
+        loadJdbcDrivers(source.loader());
+        return plugins;
+    }
+
+    /**
+     * Implementation-specific strategy for scanning a single {@link 
PluginSource}.
+     * @param source A single source to scan for plugins.
+     * @return A {@link PluginScanResult} containing all plugins which this 
scanning implementation could discover.
+     */
+    protected abstract PluginScanResult scanPlugins(PluginSource source);
+
+    private void loadJdbcDrivers(final ClassLoader loader) {
+        // Apply here what java.sql.DriverManager does to discover and 
register classes
+        // implementing the java.sql.Driver interface.
+        AccessController.doPrivileged(
+            (PrivilegedAction<Void>) () -> {
+                ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
+                        Driver.class,
+                        loader
+                );
+                Iterator<Driver> driversIterator = loadedDrivers.iterator();
+                try {
+                    while (driversIterator.hasNext()) {
+                        Driver driver = driversIterator.next();
+                        log.debug(
+                                "Registered java.sql.Driver: {} to 
java.sql.DriverManager",
+                                driver
+                        );
+                    }
+                } catch (Throwable t) {
+                    log.debug(
+                            "Ignoring java.sql.Driver classes listed in 
resources but not"
+                                    + " present in class loader's classpath: ",
+                            t
+                    );
+                }
+                return null;
+            }
+        );
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, ClassLoader loader) {
+        return new PluginDesc(plugin, version, loader);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, ClassLoader loader) {
+        SortedSet<PluginDesc<T>> result = new TreeSet<>();
+        ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
+        for (Iterator<T> iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
+            try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+                T pluginImpl;
+                try {
+                    pluginImpl = iterator.next();
+                } catch (ServiceConfigurationError t) {
+                    log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+                    continue;
+                }
+                Class<? extends T> pluginKlass = (Class<? extends T>) 
pluginImpl.getClass();
+                if (pluginKlass.getClassLoader() != loader) {
+                    log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",
+                            pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), loader);
+                    continue;
+                }
+                result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
loader));
+            }
+        }
+        return result;
+    }
+
+    protected static <T> String versionFor(T pluginImpl) {
+        try {
+            if (pluginImpl instanceof Versioned) {
+                return ((Versioned) pluginImpl).version();
+            }
+        } catch (Throwable t) {
+            log.error("Failed to get plugin version for " + 
pluginImpl.getClass(), t);
+        }
+        return PluginDesc.UNDEFINED_VERSION;
+    }
+
+    protected static String reflectiveErrorDescription(Throwable t) {
+        if (t instanceof NoSuchMethodException) {
+            return ": Plugin class must have a no-args constructor, and cannot 
be a non-static inner class";
+        } else if (t instanceof SecurityException) {
+            return ": Security settings must allow reflective instantiation of 
plugin classes";
+        } else if (t instanceof IllegalAccessException) {
+            return ": Plugin class default constructor must be public";
+        } else if (t instanceof ExceptionInInitializerError) {
+            return ": Failed to statically initialize plugin class";
+        } else if (t instanceof InvocationTargetException) {
+            return ": Failed to invoke plugin constructor";
+        } else {
+            return "";
+        }
+    }
+
+    protected LoaderSwap withClassLoader(ClassLoader loader) {
+        ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
+        try {
+            return new LoaderSwap(savedLoader);
+        } catch (Throwable t) {
+            Plugins.compareAndSwapLoaders(savedLoader);
+            throw t;
+        }
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java
new file mode 100644
index 00000000000..ffedd98d6c3
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class PluginSource {
+
+    private final Path location;
+    private final ClassLoader loader;
+    private final URL[] urls;
+
+    public PluginSource(Path location, ClassLoader loader, URL[] urls) {
+        this.location = location;
+        this.loader = loader;
+        this.urls = urls;
+    }
+
+    public Path location() {
+        return location;
+    }
+
+    public ClassLoader loader() {
+        return loader;
+    }
+
+    public URL[] urls() {
+        return urls;
+    }
+
+    public boolean isolated() {
+        return loader instanceof PluginClassLoader;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        PluginSource that = (PluginSource) o;
+        return Objects.equals(location, that.location) && 
loader.equals(that.loader) && Arrays.equals(urls, that.urls);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(location, loader);
+        result = 31 * result + Arrays.hashCode(urls);
+        return result;
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index 3592e0ec96c..e1ef76ebd2e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -16,11 +16,14 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.reflections.util.ClasspathHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Modifier;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.InvalidPathException;
@@ -325,6 +328,34 @@ public class PluginUtils {
         return Arrays.asList(archives.toArray(new Path[0]));
     }
 
+    public static Set<PluginSource> pluginSources(List<Path> pluginLocations, 
ClassLoader classLoader, PluginClassLoaderFactory factory) {
+        Set<PluginSource> pluginSources = new HashSet<>();
+        for (Path pluginLocation : pluginLocations) {
+
+            try {
+                List<URL> pluginUrls = new ArrayList<>();
+                for (Path path : pluginUrls(pluginLocation)) {
+                    pluginUrls.add(path.toUri().toURL());
+                }
+                URL[] urls = pluginUrls.toArray(new URL[0]);
+                PluginClassLoader loader = factory.newPluginClassLoader(
+                        pluginLocation.toUri().toURL(),
+                        urls,
+                        classLoader
+                );
+                pluginSources.add(new PluginSource(pluginLocation, loader, 
urls));
+            } catch (InvalidPathException | MalformedURLException e) {
+                log.error("Invalid path in plugin path: {}. Ignoring.", 
pluginLocation, e);
+            } catch (IOException e) {
+                log.error("Could not get listing for plugin path: {}. 
Ignoring.", pluginLocation, e);
+            }
+        }
+        URL[] classpathUrls = ClasspathHelper.forJavaClassPath().toArray(new 
URL[0]);
+        pluginSources.add(new PluginSource(null, classLoader.getParent(), 
classpathUrls));
+        return pluginSources;
+    }
+
+
     /**
      * Return the simple class name of a plugin as {@code String}.
      *
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index a43f0f226a2..83dec38a6fb 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -38,8 +38,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.file.Path;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -59,22 +57,22 @@ public class Plugins {
     private final PluginScanResult scanResult;
 
     public Plugins(Map<String, String> props) {
-        this(props, Plugins.class.getClassLoader());
+        this(props, Plugins.class.getClassLoader(), new ClassLoaderFactory());
     }
 
     // VisibleForTesting
-    Plugins(Map<String, String> props, ClassLoader parent) {
+    Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
         List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
-        delegatingLoader = newDelegatingClassLoader(pluginLocations, parent);
-        scanResult = delegatingLoader.initLoaders();
+        delegatingLoader = factory.newDelegatingClassLoader(parent);
+        Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
+        scanResult = initLoaders(pluginSources);
     }
 
-    // VisibleForTesting
-    protected DelegatingClassLoader newDelegatingClassLoader(final List<Path> 
pluginLocations, ClassLoader parent) {
-        return AccessController.doPrivileged(
-                (PrivilegedAction<DelegatingClassLoader>) () -> new 
DelegatingClassLoader(pluginLocations, parent)
-        );
+    private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
+        PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
+        delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
+        return reflectiveScanResult;
     }
 
     private static <T> String pluginNames(Collection<PluginDesc<T>> plugins) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
new file mode 100644
index 00000000000..9289162b638
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.provider.ConfigProvider;
+import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+import org.reflections.Configuration;
+import org.reflections.Reflections;
+import org.reflections.ReflectionsException;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * A {@link PluginScanner} implementation which uses reflection and {@link 
ServiceLoader} to discover plugins.
+ * <p>This implements the legacy discovery strategy, which uses a combination 
of reflection and service loading in
+ * order to discover plugins. Specifically, a plugin appears in the scan 
result if all the following conditions are true:
+ * <ul>
+ *     <li>The class is concrete</li>
+ *     <li>The class is public</li>
+ *     <li>The class has a no-args constructor</li>
+ *     <li>The no-args constructor is public</li>
+ *     <li>Static initialization of the class completes without throwing an 
exception</li>
+ *     <li>The no-args constructor completes without throwing an exception</li>
+ *     <li>One of the following is true:
+ *         <ul>
+ *             <li>Is a subclass of {@link SinkConnector}, {@link 
SourceConnector}, {@link Converter},
+ *             {@link HeaderConverter}, {@link Transformation}, or {@link 
Predicate}</li>
+ *             <li>Is a subclass of {@link ConfigProvider}, {@link 
ConnectRestExtension}, or
+ *             {@link ConnectorClientConfigOverridePolicy}, and has a {@link 
ServiceLoader} compatible
+ *             manifest file or module declaration</li>
+ *         </ul>
+ *     </li>
+ * </ul>
+ * <p>Note: This scanner has a runtime proportional to the number of overall 
classes in the passed-in
+ * {@link PluginSource} objects, which may be significant for plugins with 
large dependencies.
+ */
+public class ReflectionScanner extends PluginScanner {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ReflectionScanner.class);
+
+    public static <T> String versionFor(Class<? extends T> pluginKlass) throws 
ReflectiveOperationException {
+        T pluginImpl = pluginKlass.getDeclaredConstructor().newInstance();
+        return versionFor(pluginImpl);
+    }
+
+    @Override
+    protected PluginScanResult scanPlugins(PluginSource source) {
+        ClassLoader loader = source.loader();
+        ConfigurationBuilder builder = new ConfigurationBuilder();
+        builder.setClassLoaders(new ClassLoader[]{loader});
+        builder.addUrls(source.urls());
+        builder.setScanners(new SubTypesScanner());
+        builder.useParallelExecutor();
+        Reflections reflections = new InternalReflections(builder);
+
+        return new PluginScanResult(
+                getPluginDesc(reflections, SinkConnector.class, loader),
+                getPluginDesc(reflections, SourceConnector.class, loader),
+                getPluginDesc(reflections, Converter.class, loader),
+                getPluginDesc(reflections, HeaderConverter.class, loader),
+                getTransformationPluginDesc(loader, reflections),
+                getPredicatePluginDesc(loader, reflections),
+                getServiceLoaderPluginDesc(ConfigProvider.class, loader),
+                getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
+                
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
+        );
+    }
+
+    @SuppressWarnings({"unchecked"})
+    private SortedSet<PluginDesc<Predicate<?>>> 
getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
+        return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, Predicate.class, loader);
+    }
+
+    @SuppressWarnings({"unchecked"})
+    private SortedSet<PluginDesc<Transformation<?>>> 
getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
+        return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, Transformation.class, loader);
+    }
+
+    private <T> SortedSet<PluginDesc<T>> getPluginDesc(
+            Reflections reflections,
+            Class<T> klass,
+            ClassLoader loader
+    ) {
+        Set<Class<? extends T>> plugins;
+        try {
+            plugins = reflections.getSubTypesOf(klass);
+        } catch (ReflectionsException e) {
+            log.debug("Reflections scanner could not find any classes for 
URLs: " +
+                    reflections.getConfiguration().getUrls(), e);
+            return Collections.emptySortedSet();
+        }
+
+        SortedSet<PluginDesc<T>> result = new TreeSet<>();
+        for (Class<? extends T> pluginKlass : plugins) {
+            if (!PluginUtils.isConcrete(pluginKlass)) {
+                log.debug("Skipping {} as it is not concrete implementation", 
pluginKlass);
+                continue;
+            }
+            if (pluginKlass.getClassLoader() != loader) {
+                log.debug("{} from other classloader {} is visible from {}, 
excluding to prevent isolated loading",
+                        pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), loader);
+                continue;
+            }
+            try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+                result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), 
loader));
+            } catch (ReflectiveOperationException | LinkageError e) {
+                log.error("Failed to discover {}: Unable to instantiate {}{}", 
klass.getSimpleName(), pluginKlass.getSimpleName(), 
reflectiveErrorDescription(e), e);
+            }
+        }
+        return result;
+    }
+
+    private static class InternalReflections extends Reflections {
+
+        public InternalReflections(Configuration configuration) {
+            super(configuration);
+        }
+
+        // When Reflections is used for parallel scans, it has a bug where it 
propagates ReflectionsException
+        // as RuntimeException.  Override the scan behavior to emulate the 
singled-threaded logic.
+        @Override
+        protected void scan(URL url) {
+            try {
+                super.scan(url);
+            } catch (ReflectionsException e) {
+                Logger log = Reflections.log;
+                if (log != null && log.isWarnEnabled()) {
+                    log.warn("could not create Vfs.Dir from url. ignoring the 
exception and continuing", e);
+                }
+            }
+        }
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfo.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfo.java
index e7032678676..fdefe22ec59 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfo.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfo.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.rest.entities;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.PluginType;
 
@@ -91,7 +90,7 @@ public class PluginInfo {
     public static final class NoVersionFilter {
         // This method is used by Jackson to filter the version field for 
plugins that don't have a version
         public boolean equals(Object obj) {
-            return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj);
+            return PluginDesc.UNDEFINED_VERSION.equals(obj);
         }
 
         // Dummy hashCode method to not fail compilation because of equals() 
method
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
index 564969c6090..4f683e3b355 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
@@ -14,112 +14,101 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.connect.runtime.isolation;
 
-import org.junit.Rule;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.nio.file.Files;
-import java.nio.file.Path;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.Collections;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class DelegatingClassLoaderTest {
 
-    @Rule
-    public TemporaryFolder pluginDir = new TemporaryFolder();
-
-    @Test
-    public void testLoadingUnloadedPluginClass() {
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                Collections.emptyList(),
-                DelegatingClassLoader.class.getClassLoader()
-        );
-        classLoader.initLoaders();
-        for (String pluginClassName : TestPlugins.pluginClasses()) {
-            assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
+    public PluginClassLoader parent;
+    public PluginClassLoader pluginLoader;
+    public DelegatingClassLoader classLoader;
+    public PluginDesc<SinkConnector> pluginDesc;
+    public PluginScanResult scanResult;
+
+    // Arbitrary values, their contents is not meaningful.
+    public static final String ARBITRARY = "arbitrary";
+    public static final Class<?> ARBITRARY_CLASS = org.mockito.Mockito.class;
+    public static final URL ARBITRARY_URL;
+
+    static {
+        try {
+            ARBITRARY_URL = new URL("jar:file://" + ARBITRARY + "!/" + 
ARBITRARY);
+        } catch (MalformedURLException e) {
+            throw new RuntimeException(e);
         }
     }
 
-    @Test
-    public void testLoadingPluginClass() throws ClassNotFoundException {
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                TestPlugins.pluginPath(),
-                DelegatingClassLoader.class.getClassLoader()
+    @Before
+    @SuppressWarnings({"unchecked"})
+    public void setUp() {
+        parent = mock(PluginClassLoader.class);
+        pluginLoader = mock(PluginClassLoader.class);
+        classLoader = new DelegatingClassLoader(parent);
+        SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
+        // Lie to the DCL that this arbitrary class is a connector, since all 
real connector classes we have access to
+        // are forced to be non-isolated by PluginUtils.shouldLoadInIsolation.
+        pluginDesc = new PluginDesc<>((Class<? extends SinkConnector>) 
ARBITRARY_CLASS, null, pluginLoader);
+        assertTrue(PluginUtils.shouldLoadInIsolation(pluginDesc.className()));
+        sinkConnectors.add(pluginDesc);
+        scanResult = new PluginScanResult(
+                sinkConnectors,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
         );
-        classLoader.initLoaders();
-        for (String pluginClassName : TestPlugins.pluginClasses()) {
-            assertNotNull(classLoader.loadClass(pluginClassName));
-            assertNotNull(classLoader.pluginClassLoader(pluginClassName));
-        }
     }
 
     @Test
-    public void testLoadingInvalidUberJar() throws Exception {
-        pluginDir.newFile("invalid.jar");
-
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
-                DelegatingClassLoader.class.getClassLoader()
-        );
-        classLoader.initLoaders();
+    public void testEmptyConnectorLoader() {
+        assertSame(classLoader, classLoader.connectorLoader(ARBITRARY));
     }
 
     @Test
-    public void testLoadingPluginDirContainsInvalidJarsOnly() throws Exception 
{
-        pluginDir.newFolder("my-plugin");
-        pluginDir.newFile("my-plugin/invalid.jar");
-
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
-                DelegatingClassLoader.class.getClassLoader()
-        );
-        classLoader.initLoaders();
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void testEmptyLoadClass() throws ClassNotFoundException {
+        when(parent.loadClass(ARBITRARY, false)).thenReturn((Class) 
ARBITRARY_CLASS);
+        assertSame(ARBITRARY_CLASS, classLoader.loadClass(ARBITRARY, false));
     }
 
     @Test
-    public void testLoadingNoPlugins() {
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
-                DelegatingClassLoader.class.getClassLoader()
-        );
-        classLoader.initLoaders();
+    public void testEmptyGetResource() {
+        when(parent.getResource(ARBITRARY)).thenReturn(ARBITRARY_URL);
+        assertSame(ARBITRARY_URL, classLoader.getResource(ARBITRARY));
     }
 
     @Test
-    public void testLoadingPluginDirEmpty() throws Exception {
-        pluginDir.newFolder("my-plugin");
-
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
-                DelegatingClassLoader.class.getClassLoader()
-        );
-        classLoader.initLoaders();
+    public void testInitializedConnectorLoader() {
+        classLoader.installDiscoveredPlugins(scanResult);
+        assertSame(pluginLoader, 
classLoader.connectorLoader(PluginUtils.prunedName(pluginDesc)));
+        assertSame(pluginLoader, 
classLoader.connectorLoader(PluginUtils.simpleName(pluginDesc)));
+        assertSame(pluginLoader, 
classLoader.connectorLoader(pluginDesc.className()));
     }
 
     @Test
-    public void testLoadingMixOfValidAndInvalidPlugins() throws Exception {
-        pluginDir.newFile("invalid.jar");
-        pluginDir.newFolder("my-plugin");
-        pluginDir.newFile("my-plugin/invalid.jar");
-        Path pluginPath = this.pluginDir.getRoot().toPath();
-
-        for (Path source : TestPlugins.pluginPath()) {
-            Files.copy(source, pluginPath.resolve(source.getFileName()));
-        }
-
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
-                DelegatingClassLoader.class.getClassLoader()
-        );
-        classLoader.initLoaders();
-        for (String pluginClassName : TestPlugins.pluginClasses()) {
-            assertNotNull(classLoader.loadClass(pluginClassName));
-            assertNotNull(classLoader.pluginClassLoader(pluginClassName));
-        }
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void testInitializedLoadClass() throws ClassNotFoundException {
+        classLoader.installDiscoveredPlugins(scanResult);
+        String className = pluginDesc.className();
+        when(pluginLoader.loadClass(className, false)).thenReturn((Class) 
ARBITRARY_CLASS);
+        assertSame(ARBITRARY_CLASS, classLoader.loadClass(className, false));
     }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
similarity index 69%
copy from 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
copy to 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
index 564969c6090..28e3a4a56f6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
@@ -24,22 +24,22 @@ import org.junit.rules.TemporaryFolder;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
+import java.util.List;
+import java.util.Set;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 
-public class DelegatingClassLoaderTest {
+public class PluginScannerTest {
 
     @Rule
     public TemporaryFolder pluginDir = new TemporaryFolder();
 
     @Test
     public void testLoadingUnloadedPluginClass() {
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                Collections.emptyList(),
-                DelegatingClassLoader.class.getClassLoader()
+        DelegatingClassLoader classLoader = initClassLoader(
+                Collections.emptyList()
         );
-        classLoader.initLoaders();
         for (String pluginClassName : TestPlugins.pluginClasses()) {
             assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
         }
@@ -47,11 +47,9 @@ public class DelegatingClassLoaderTest {
 
     @Test
     public void testLoadingPluginClass() throws ClassNotFoundException {
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                TestPlugins.pluginPath(),
-                DelegatingClassLoader.class.getClassLoader()
+        DelegatingClassLoader classLoader = initClassLoader(
+                TestPlugins.pluginPath()
         );
-        classLoader.initLoaders();
         for (String pluginClassName : TestPlugins.pluginClasses()) {
             assertNotNull(classLoader.loadClass(pluginClassName));
             assertNotNull(classLoader.pluginClassLoader(pluginClassName));
@@ -62,11 +60,9 @@ public class DelegatingClassLoaderTest {
     public void testLoadingInvalidUberJar() throws Exception {
         pluginDir.newFile("invalid.jar");
 
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
-                DelegatingClassLoader.class.getClassLoader()
+        initClassLoader(
+                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
         );
-        classLoader.initLoaders();
     }
 
     @Test
@@ -74,31 +70,25 @@ public class DelegatingClassLoaderTest {
         pluginDir.newFolder("my-plugin");
         pluginDir.newFile("my-plugin/invalid.jar");
 
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
-                DelegatingClassLoader.class.getClassLoader()
+        initClassLoader(
+                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
         );
-        classLoader.initLoaders();
     }
 
     @Test
     public void testLoadingNoPlugins() {
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
-                DelegatingClassLoader.class.getClassLoader()
+        initClassLoader(
+                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
         );
-        classLoader.initLoaders();
     }
 
     @Test
     public void testLoadingPluginDirEmpty() throws Exception {
         pluginDir.newFolder("my-plugin");
 
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
-                DelegatingClassLoader.class.getClassLoader()
+        initClassLoader(
+                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
         );
-        classLoader.initLoaders();
     }
 
     @Test
@@ -112,14 +102,22 @@ public class DelegatingClassLoaderTest {
             Files.copy(source, pluginPath.resolve(source.getFileName()));
         }
 
-        DelegatingClassLoader classLoader = new DelegatingClassLoader(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
-                DelegatingClassLoader.class.getClassLoader()
+        DelegatingClassLoader classLoader = initClassLoader(
+                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
         );
-        classLoader.initLoaders();
         for (String pluginClassName : TestPlugins.pluginClasses()) {
             assertNotNull(classLoader.loadClass(pluginClassName));
             assertNotNull(classLoader.pluginClassLoader(pluginClassName));
         }
     }
+
+    private DelegatingClassLoader initClassLoader(List<Path> pluginLocations) {
+        ClassLoaderFactory factory = new ClassLoaderFactory();
+        DelegatingClassLoader classLoader = 
factory.newDelegatingClassLoader(DelegatingClassLoader.class.getClassLoader());
+        Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, classLoader, factory);
+        PluginScanResult scanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
+        classLoader.installDiscoveredPlugins(scanResult);
+        return classLoader;
+    }
+
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index e185a330d21..bb6d399c122 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -498,7 +498,7 @@ public class PluginsTest {
                 WorkerConfig.PLUGIN_PATH_CONFIG,
                 TestPlugins.pluginPathJoined(childResource)
         );
-        plugins = new Plugins(pluginProps, parent);
+        plugins = new Plugins(pluginProps, parent, new ClassLoaderFactory());
 
         Converter converter = plugins.newPlugin(
                 className,
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index 018d8449b73..c7b884c3605 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -24,12 +24,10 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.MonitorInfo;
 import java.lang.management.ThreadInfo;
 import java.net.URL;
-import java.nio.file.Path;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.BrokenBarrierException;
@@ -79,15 +77,7 @@ public class SynchronizationTest {
             + "." + testName.getMethodName() + "-";
         dclBreakpoint = new Breakpoint<>();
         pclBreakpoint = new Breakpoint<>();
-        plugins = new Plugins(pluginProps) {
-            @Override
-            protected DelegatingClassLoader 
newDelegatingClassLoader(List<Path> pluginLocations, ClassLoader parent) {
-                return AccessController.doPrivileged(
-                    (PrivilegedAction<DelegatingClassLoader>) () ->
-                        new SynchronizedDelegatingClassLoader(pluginLocations, 
parent)
-                );
-            }
-        };
+        plugins = new Plugins(pluginProps, Plugins.class.getClassLoader(), new 
SynchronizedClassLoaderFactory());
         exec = new ThreadPoolExecutor(
             2,
             2,
@@ -167,26 +157,36 @@ public class SynchronizationTest {
         }
     }
 
-    private class SynchronizedDelegatingClassLoader extends 
DelegatingClassLoader {
-        {
-            ClassLoader.registerAsParallelCapable();
-        }
-
-        public SynchronizedDelegatingClassLoader(List<Path> pluginLocations, 
ClassLoader parent) {
-            super(pluginLocations, parent);
+    private class SynchronizedClassLoaderFactory extends ClassLoaderFactory {
+        @Override
+        public DelegatingClassLoader newDelegatingClassLoader(ClassLoader 
parent) {
+            return AccessController.doPrivileged(
+                    (PrivilegedAction<DelegatingClassLoader>) () ->
+                            new SynchronizedDelegatingClassLoader(parent)
+            );
         }
 
         @Override
-        protected PluginClassLoader newPluginClassLoader(
-            URL pluginLocation,
-            URL[] urls,
-            ClassLoader parent
+        public PluginClassLoader newPluginClassLoader(
+                URL pluginLocation,
+                URL[] urls,
+                ClassLoader parent
         ) {
             return AccessController.doPrivileged(
-                (PrivilegedAction<PluginClassLoader>) () ->
-                    new SynchronizedPluginClassLoader(pluginLocation, urls, 
parent)
+                    (PrivilegedAction<PluginClassLoader>) () ->
+                            new SynchronizedPluginClassLoader(pluginLocation, 
urls, parent)
             );
         }
+    }
+
+    private class SynchronizedDelegatingClassLoader extends 
DelegatingClassLoader {
+        {
+            ClassLoader.registerAsParallelCapable();
+        }
+
+        public SynchronizedDelegatingClassLoader(ClassLoader parent) {
+            super(parent);
+        }
 
         @Override
         public PluginClassLoader pluginClassLoader(String name) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
index d320c79711a..34e28ee47f6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.rest.entities;
 
-import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.junit.Test;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -32,6 +32,6 @@ public class PluginInfoTest {
         assertFalse(filter.equals("1.0"));
         assertFalse(filter.equals(new Object()));
         assertFalse(filter.equals(null));
-        assertTrue(filter.equals(DelegatingClassLoader.UNDEFINED_VERSION));
+        assertTrue(filter.equals(PluginDesc.UNDEFINED_VERSION));
     }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 1072373579b..57149ca498d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -33,11 +33,11 @@ import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.SampleSinkConnector;
 import org.apache.kafka.connect.runtime.SampleSourceConnector;
 import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
-import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.PluginType;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@@ -453,7 +453,7 @@ public class ConnectorPluginsResourceTest {
         public MockConnectorPluginDesc(Class<T> klass) throws Exception {
             super(
                     klass,
-                    DelegatingClassLoader.versionFor(klass),
+                    ReflectionScanner.versionFor(klass),
                     new MockPluginClassLoader(null, new URL[0])
             );
         }

Reply via email to