This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b9a45546a79 KAFKA-15244: Remove PluginType.from(Class) (#14089)
b9a45546a79 is described below

commit b9a45546a7918799b6fb3c0fe63b56f47d8fcba9
Author: Greg Harris <greg.har...@aiven.io>
AuthorDate: Tue Aug 1 10:05:46 2023 -0700

    KAFKA-15244: Remove PluginType.from(Class) (#14089)
    
    Reviewers: Chris Egerton <chr...@aiven.io>
---
 .../kafka/connect/runtime/AbstractHerder.java      |  41 +++----
 .../runtime/isolation/PluginClassLoader.java       |  14 +--
 .../connect/runtime/isolation/PluginDesc.java      |  26 +++--
 .../runtime/isolation/PluginScanResult.java        |   2 +-
 .../connect/runtime/isolation/PluginScanner.java   |  25 ++--
 .../connect/runtime/isolation/PluginType.java      |  16 +--
 .../connect/runtime/isolation/PluginUtils.java     |  11 +-
 .../kafka/connect/runtime/isolation/Plugins.java   |   2 +-
 .../runtime/isolation/ReflectionScanner.java       |  33 +++---
 .../runtime/isolation/ServiceLoaderScanner.java    |  18 +--
 .../kafka/connect/runtime/AbstractHerderTest.java  |   5 +-
 .../isolation/DelegatingClassLoaderTest.java       |   3 +-
 .../connect/runtime/isolation/PluginDescTest.java  | 128 +++++++++++++++++----
 .../runtime/isolation/PluginScannerTest.java       |  14 +--
 .../connect/runtime/isolation/PluginUtilsTest.java |  18 +--
 .../connect/runtime/isolation/TestPlugins.java     |   7 +-
 .../resources/ConnectorPluginsResourceTest.java    |  93 +++++----------
 17 files changed, 246 insertions(+), 210 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index b1a19fd46ae..7f04914c8d1 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -33,7 +33,6 @@ import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
-import org.apache.kafka.connect.runtime.isolation.PluginType;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
@@ -842,34 +841,26 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
 
         try (LoaderSwap loaderSwap = 
p.withClassLoader(pluginClass.getClassLoader())) {
             Object plugin = p.newPlugin(pluginName);
-            PluginType pluginType = PluginType.from(plugin.getClass());
             // Contains definitions coming from Connect framework
             ConfigDef baseConfigDefs = null;
             // Contains definitions specifically declared on the plugin
             ConfigDef pluginConfigDefs;
-            switch (pluginType) {
-                case SINK:
-                    baseConfigDefs = SinkConnectorConfig.configDef();
-                    pluginConfigDefs = ((SinkConnector) plugin).config();
-                    break;
-                case SOURCE:
-                    baseConfigDefs = SourceConnectorConfig.configDef();
-                    pluginConfigDefs = ((SourceConnector) plugin).config();
-                    break;
-                case CONVERTER:
-                    pluginConfigDefs = ((Converter) plugin).config();
-                    break;
-                case HEADER_CONVERTER:
-                    pluginConfigDefs = ((HeaderConverter) plugin).config();
-                    break;
-                case TRANSFORMATION:
-                    pluginConfigDefs = ((Transformation<?>) plugin).config();
-                    break;
-                case PREDICATE:
-                    pluginConfigDefs = ((Predicate<?>) plugin).config();
-                    break;
-                default:
-                    throw new BadRequestException("Invalid plugin type " + 
pluginType + ". Valid types are sink, source, converter, header_converter, 
transformation, predicate.");
+            if (plugin instanceof SinkConnector) {
+                baseConfigDefs = SinkConnectorConfig.configDef();
+                pluginConfigDefs = ((SinkConnector) plugin).config();
+            } else if (plugin instanceof SourceConnector) {
+                baseConfigDefs = SourceConnectorConfig.configDef();
+                pluginConfigDefs = ((SourceConnector) plugin).config();
+            } else if (plugin instanceof Converter) {
+                pluginConfigDefs = ((Converter) plugin).config();
+            } else if (plugin instanceof HeaderConverter) {
+                pluginConfigDefs = ((HeaderConverter) plugin).config();
+            } else if (plugin instanceof Transformation) {
+                pluginConfigDefs = ((Transformation<?>) plugin).config();
+            } else if (plugin instanceof Predicate) {
+                pluginConfigDefs = ((Predicate<?>) plugin).config();
+            } else {
+                throw new BadRequestException("Invalid plugin class " + 
pluginName + ". Valid types are sink, source, converter, header_converter, 
transformation, predicate.");
             }
 
             // Track config properties by name and, if the same property is 
defined in multiple places,
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
index da05966e13a..825d1e6d114 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
@@ -55,19 +55,7 @@ public class PluginClassLoader extends URLClassLoader {
      */
     public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader 
parent) {
         super(urls, parent);
-        this.pluginLocation = pluginLocation;
-    }
-
-    /**
-     * Constructor that defines the system classloader as parent of this 
plugin classloader.
-     *
-     * @param pluginLocation the top-level location of the plugin to be loaded 
in isolation by this
-     * classloader.
-     * @param urls the list of urls from which to load classes and resources 
for this plugin.
-     */
-    public PluginClassLoader(URL pluginLocation, URL[] urls) {
-        super(urls);
-        this.pluginLocation = pluginLocation;
+        this.pluginLocation = Objects.requireNonNull(pluginLocation, "Plugin 
location must be non-null");
     }
 
     /**
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
index e001f104ee3..395dc92d7b5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
@@ -21,7 +21,7 @@ import 
org.apache.maven.artifact.versioning.DefaultArtifactVersion;
 
 import java.util.Objects;
 
-public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
+public class PluginDesc<T> implements Comparable<PluginDesc<?>> {
     public static final String UNDEFINED_VERSION = "undefined";
     private final Class<? extends T> klass;
     private final String name;
@@ -32,15 +32,16 @@ public class PluginDesc<T> implements 
Comparable<PluginDesc<T>> {
     private final String location;
     private final ClassLoader loader;
 
-    public PluginDesc(Class<? extends T> klass, String version, ClassLoader 
loader) {
-        this.klass = klass;
-        this.name = klass.getName();
+    public PluginDesc(Class<? extends T> klass, String version, PluginType 
type, ClassLoader loader) {
+        this.klass = Objects.requireNonNull(klass, "Plugin class must be 
non-null");
+        this.name = this.klass.getName();
         this.version = version != null ? version : "null";
         this.encodedVersion = new DefaultArtifactVersion(this.version);
-        this.type = PluginType.from(klass);
-        this.typeName = type.toString();
+        this.type = Objects.requireNonNull(type, "Plugin type must be 
non-null");
+        this.typeName = this.type.toString();
+        Objects.requireNonNull(loader, "Plugin classloader must be non-null");
         this.location = loader instanceof PluginClassLoader
-                ? ((PluginClassLoader) loader).location()
+                ? Objects.requireNonNull(((PluginClassLoader) 
loader).location(), "Plugin location must be non-null")
                 : "classpath";
         this.loader = loader;
     }
@@ -110,11 +111,18 @@ public class PluginDesc<T> implements 
Comparable<PluginDesc<T>> {
     }
 
     @Override
-    public int compareTo(PluginDesc<T> other) {
+    public int compareTo(PluginDesc<?> other) {
         int nameComp = name.compareTo(other.name);
         int versionComp = encodedVersion.compareTo(other.encodedVersion);
         // isolated plugins appear after classpath plugins when they have 
identical versions.
         int isolatedComp = Boolean.compare(other.loader instanceof 
PluginClassLoader, loader instanceof PluginClassLoader);
-        return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : 
isolatedComp);
+        // choose an arbitrary order between different locations and types
+        int loaderComp = location.compareTo(other.location);
+        int typeComp = type.compareTo(other.type);
+        return nameComp != 0 ? nameComp :
+                versionComp != 0 ? versionComp :
+                        isolatedComp != 0 ? isolatedComp :
+                                loaderComp != 0 ? loaderComp :
+                                        typeComp;
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index ae015ed3507..b452beb9b63 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -88,7 +88,7 @@ public class PluginScanResult {
         );
     }
 
-    private static <R extends Comparable<R>> SortedSet<R> 
merge(List<PluginScanResult> results, Function<PluginScanResult, SortedSet<R>> 
accessor) {
+    private static <R extends Comparable<?>> SortedSet<R> 
merge(List<PluginScanResult> results, Function<PluginScanResult, SortedSet<R>> 
accessor) {
         SortedSet<R> merged = new TreeSet<>();
         for (PluginScanResult element : results) {
             merged.addAll(accessor.apply(element));
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
index 474a92f5cf5..acb5b668cf3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
@@ -120,32 +120,32 @@ public abstract class PluginScanner {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, PluginSource source) {
-        return new PluginDesc(plugin, version, source.loader());
+    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, PluginType type, PluginSource source) {
+        return new PluginDesc(plugin, version, type, source.loader());
     }
 
     @SuppressWarnings("unchecked")
-    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, PluginSource source) {
+    protected <T> SortedSet<PluginDesc<T>> 
getServiceLoaderPluginDesc(PluginType type, PluginSource source) {
         SortedSet<PluginDesc<T>> result = new TreeSet<>();
-        ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, 
source.loader());
+        ServiceLoader<T> serviceLoader = ServiceLoader.load((Class<T>) 
type.superClass(), source.loader());
         Iterator<T> iterator = serviceLoader.iterator();
-        while (handleLinkageError(klass, source, iterator::hasNext)) {
+        while (handleLinkageError(type, source, iterator::hasNext)) {
             try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
                 T pluginImpl;
                 try {
-                    pluginImpl = handleLinkageError(klass, source, 
iterator::next);
+                    pluginImpl = handleLinkageError(type, source, 
iterator::next);
                 } catch (ServiceConfigurationError t) {
                     log.error("Failed to discover {} in {}{}",
-                            klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);
+                            type.simpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);
                     continue;
                 }
                 Class<? extends T> pluginKlass = (Class<? extends T>) 
pluginImpl.getClass();
                 if (pluginKlass.getClassLoader() != source.loader()) {
                     log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",
-                            pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), source.location());
+                            type.simpleName(), pluginKlass.getClassLoader(), 
source.location());
                     continue;
                 }
-                result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
source));
+                result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
type, source));
             }
         }
         return result;
@@ -154,14 +154,13 @@ public abstract class PluginScanner {
     /**
      * Helper to evaluate a {@link ServiceLoader} operation while handling 
{@link LinkageError}s.
      *
-     * @param klass The plugin superclass which is being loaded
+     * @param type The plugin type which is being loaded
      * @param function A function on a {@link ServiceLoader}'s {@link 
Iterator} which may throw {@link LinkageError}
      * @return the return value of function
      * @throws Error errors thrown by the passed-in function
-     * @param <T> Type being iterated over by the ServiceLoader
      * @param <U> Return value of the passed-in function
      */
-    private <T, U> U handleLinkageError(Class<T> klass, PluginSource source, 
Supplier<U> function) {
+    private <U> U handleLinkageError(PluginType type, PluginSource source, 
Supplier<U> function) {
         // It's difficult to know for sure if the iterator was able to advance 
past the first broken
         // plugin class, or if it will continue to fail on that broken class 
for any subsequent calls
         // to Iterator::hasNext or Iterator::next
@@ -182,7 +181,7 @@ public abstract class PluginScanner {
                         || !Objects.equals(lastError.getClass(), t.getClass())
                         || !Objects.equals(lastError.getMessage(), 
t.getMessage())) {
                     log.error("Failed to discover {} in {}{}",
-                            klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);
+                            type.simpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);
                 }
                 lastError = t;
             }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
index 696e14ba8cd..0f26e84a0bb 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
@@ -37,8 +37,7 @@ public enum PluginType {
     PREDICATE(Predicate.class),
     CONFIGPROVIDER(ConfigProvider.class),
     REST_EXTENSION(ConnectRestExtension.class),
-    
CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY(ConnectorClientConfigOverridePolicy.class),
-    UNKNOWN(Object.class);
+    
CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY(ConnectorClientConfigOverridePolicy.class);
 
     private final Class<?> klass;
 
@@ -46,19 +45,14 @@ public enum PluginType {
         this.klass = klass;
     }
 
-    public static PluginType from(Class<?> klass) {
-        for (PluginType type : PluginType.values()) {
-            if (type.klass.isAssignableFrom(klass)) {
-                return type;
-            }
-        }
-        return UNKNOWN;
-    }
-
     public String simpleName() {
         return klass.getSimpleName();
     }
 
+    public Class<?> superClass() {
+        return klass;
+    }
+
     @Override
     public String toString() {
         return super.toString().toLowerCase(Locale.ROOT);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index 88b1fc0484a..a673d6e54fd 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
@@ -196,12 +197,12 @@ public class PluginUtils {
         return path.toString().toLowerCase(Locale.ROOT).endsWith(".class");
     }
 
-    public static List<Path> pluginLocations(String pluginPath) {
+    public static Set<Path> pluginLocations(String pluginPath) {
         if (pluginPath == null) {
-            return Collections.emptyList();
+            return Collections.emptySet();
         }
         String[] pluginPathElements = 
COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1);
-        List<Path> pluginLocations = new ArrayList<>();
+        Set<Path> pluginLocations = new LinkedHashSet<>();
         for (String path : pluginPathElements) {
             try {
                 Path pluginPathElement = Paths.get(path).toAbsolutePath();
@@ -328,8 +329,8 @@ public class PluginUtils {
         return Arrays.asList(archives.toArray(new Path[0]));
     }
 
-    public static Set<PluginSource> pluginSources(List<Path> pluginLocations, 
ClassLoader classLoader, PluginClassLoaderFactory factory) {
-        Set<PluginSource> pluginSources = new HashSet<>();
+    public static Set<PluginSource> pluginSources(Set<Path> pluginLocations, 
ClassLoader classLoader, PluginClassLoaderFactory factory) {
+        Set<PluginSource> pluginSources = new LinkedHashSet<>();
         for (Path pluginLocation : pluginLocations) {
 
             try {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 83dec38a6fb..36e20270abc 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -63,7 +63,7 @@ public class Plugins {
     // VisibleForTesting
     Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
-        List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
+        Set<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = factory.newDelegatingClassLoader(parent);
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
         scanResult = initLoaders(pluginSources);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
index e38fefc78c9..ad5c00c42ec 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
@@ -69,7 +69,7 @@ public class ReflectionScanner extends PluginScanner {
 
     private static final Logger log = 
LoggerFactory.getLogger(ReflectionScanner.class);
 
-    public static <T> String versionFor(Class<? extends T> pluginKlass) throws 
ReflectiveOperationException {
+    private static <T> String versionFor(Class<? extends T> pluginKlass) 
throws ReflectiveOperationException {
         T pluginImpl = pluginKlass.getDeclaredConstructor().newInstance();
         return versionFor(pluginImpl);
     }
@@ -84,39 +84,40 @@ public class ReflectionScanner extends PluginScanner {
         Reflections reflections = new Reflections(builder);
 
         return new PluginScanResult(
-                getPluginDesc(reflections, SinkConnector.class, source),
-                getPluginDesc(reflections, SourceConnector.class, source),
-                getPluginDesc(reflections, Converter.class, source),
-                getPluginDesc(reflections, HeaderConverter.class, source),
+                getPluginDesc(reflections, PluginType.SINK, source),
+                getPluginDesc(reflections, PluginType.SOURCE, source),
+                getPluginDesc(reflections, PluginType.CONVERTER, source),
+                getPluginDesc(reflections, PluginType.HEADER_CONVERTER, 
source),
                 getTransformationPluginDesc(source, reflections),
                 getPredicatePluginDesc(source, reflections),
-                getServiceLoaderPluginDesc(ConfigProvider.class, source),
-                getServiceLoaderPluginDesc(ConnectRestExtension.class, source),
-                
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source)
+                getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source),
+                getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source),
+                
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, 
source)
         );
     }
 
     @SuppressWarnings({"unchecked"})
     private SortedSet<PluginDesc<Predicate<?>>> 
getPredicatePluginDesc(PluginSource source, Reflections reflections) {
-        return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, Predicate.class, source);
+        return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, PluginType.PREDICATE, source);
     }
 
     @SuppressWarnings({"unchecked"})
     private SortedSet<PluginDesc<Transformation<?>>> 
getTransformationPluginDesc(PluginSource source, Reflections reflections) {
-        return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, Transformation.class, source);
+        return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, PluginType.TRANSFORMATION, source);
     }
 
+    @SuppressWarnings({"unchecked"})
     private <T> SortedSet<PluginDesc<T>> getPluginDesc(
             Reflections reflections,
-            Class<T> klass,
+            PluginType type,
             PluginSource source
     ) {
         Set<Class<? extends T>> plugins;
         try {
-            plugins = reflections.getSubTypesOf(klass);
+            plugins = reflections.getSubTypesOf((Class<T>) type.superClass());
         } catch (ReflectionsException e) {
             log.debug("Reflections scanner could not find any {} in {} for 
URLs: {}",
-                    klass, source.location(), source.urls(), e);
+                    type, source.location(), source.urls(), e);
             return Collections.emptySortedSet();
         }
 
@@ -128,14 +129,14 @@ public class ReflectionScanner extends PluginScanner {
             }
             if (pluginKlass.getClassLoader() != source.loader()) {
                 log.debug("{} from other classloader {} is visible from {}, 
excluding to prevent isolated loading",
-                        pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), source.location());
+                        pluginKlass, pluginKlass.getClassLoader(), 
source.location());
                 continue;
             }
             try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
-                result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), 
source));
+                result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), 
type, source));
             } catch (ReflectiveOperationException | LinkageError e) {
                 log.error("Failed to discover {} in {}: Unable to instantiate 
{}{}",
-                        klass.getSimpleName(), source.location(), 
pluginKlass.getSimpleName(),
+                        type.simpleName(), source.location(), 
pluginKlass.getSimpleName(),
                         reflectiveErrorDescription(e), e);
             }
         }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java
index 727a737ff3f..9f36a5ad8e0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java
@@ -56,25 +56,25 @@ public class ServiceLoaderScanner extends PluginScanner {
     @Override
     protected PluginScanResult scanPlugins(PluginSource source) {
         return new PluginScanResult(
-                getServiceLoaderPluginDesc(SinkConnector.class, source),
-                getServiceLoaderPluginDesc(SourceConnector.class, source),
-                getServiceLoaderPluginDesc(Converter.class, source),
-                getServiceLoaderPluginDesc(HeaderConverter.class, source),
+                getServiceLoaderPluginDesc(PluginType.SINK, source),
+                getServiceLoaderPluginDesc(PluginType.SOURCE, source),
+                getServiceLoaderPluginDesc(PluginType.CONVERTER, source),
+                getServiceLoaderPluginDesc(PluginType.HEADER_CONVERTER, 
source),
                 getTransformationPluginDesc(source),
                 getPredicatePluginDesc(source),
-                getServiceLoaderPluginDesc(ConfigProvider.class, source),
-                getServiceLoaderPluginDesc(ConnectRestExtension.class, source),
-                
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source)
+                getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source),
+                getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source),
+                
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, 
source)
         );
     }
 
     @SuppressWarnings({"unchecked"})
     private SortedSet<PluginDesc<Predicate<?>>> 
getPredicatePluginDesc(PluginSource source) {
-        return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) 
getServiceLoaderPluginDesc(Predicate.class, source);
+        return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) 
getServiceLoaderPluginDesc(PluginType.PREDICATE, source);
     }
 
     @SuppressWarnings({"unchecked"})
     private SortedSet<PluginDesc<Transformation<?>>> 
getTransformationPluginDesc(PluginSource source) {
-        return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) 
getServiceLoaderPluginDesc(Transformation.class, source);
+        return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) 
getServiceLoaderPluginDesc(PluginType.TRANSFORMATION, source);
     }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index f1993a7e7f1..68ee2ce1561 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.connect.errors.NotFoundException;
 import 
org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
@@ -667,12 +668,12 @@ public class AbstractHerderTest {
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     private PluginDesc<Predicate<?>> predicatePluginDesc() {
-        return new PluginDesc(SamplePredicate.class, "1.0", classLoader);
+        return new PluginDesc(SamplePredicate.class, "1.0", 
PluginType.PREDICATE, classLoader);
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     private PluginDesc<Transformation<?>> transformationPluginDesc() {
-        return new PluginDesc(SampleTransformation.class, "1.0", classLoader);
+        return new PluginDesc(SampleTransformation.class, "1.0", 
PluginType.TRANSFORMATION, classLoader);
     }
 
     @Test
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
index 4f683e3b355..a3f2ab142dc 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
@@ -61,7 +61,8 @@ public class DelegatingClassLoaderTest {
         SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
         // Lie to the DCL that this arbitrary class is a connector, since all 
real connector classes we have access to
         // are forced to be non-isolated by PluginUtils.shouldLoadInIsolation.
-        pluginDesc = new PluginDesc<>((Class<? extends SinkConnector>) 
ARBITRARY_CLASS, null, pluginLoader);
+        when(pluginLoader.location()).thenReturn("some-location");
+        pluginDesc = new PluginDesc<>((Class<? extends SinkConnector>) 
ARBITRARY_CLASS, null, PluginType.SINK, pluginLoader);
         assertTrue(PluginUtils.shouldLoadInIsolation(pluginDesc.className()));
         sinkConnectors.add(pluginDesc);
         scanResult = new PluginScanResult(
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
index f18537d0c32..171fbde6997 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
@@ -17,10 +17,13 @@
 
 package org.apache.kafka.connect.runtime.isolation;
 
-import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.config.provider.FileConfigProvider;
+import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 import org.junit.Before;
@@ -31,7 +34,10 @@ import java.nio.file.Paths;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class PluginDescTest {
     private final ClassLoader systemLoader = 
ClassLoader.getSystemClassLoader();
@@ -40,41 +46,47 @@ public class PluginDescTest {
     private final String snapshotVersion = "1.0.0-SNAPSHOT";
     private final String noVersion = "undefined";
     private PluginClassLoader pluginLoader;
+    private PluginClassLoader otherPluginLoader;
 
     @Before
     public void setUp() throws Exception {
         // Fairly simple use case, thus no need to create a random directory 
here yet.
         URL location = Paths.get("/tmp").toUri().toURL();
+        URL otherLocation = Paths.get("/tmp-other").toUri().toURL();
         // Normally parent will be a DelegatingClassLoader.
         pluginLoader = new PluginClassLoader(location, new URL[0], 
systemLoader);
+        otherPluginLoader = new PluginClassLoader(otherLocation, new URL[0], 
systemLoader);
     }
 
     @SuppressWarnings("rawtypes")
     @Test
     public void testRegularPluginDesc() {
-        PluginDesc<Connector> connectorDesc = new PluginDesc<>(
-                Connector.class,
+        PluginDesc<SinkConnector> connectorDesc = new PluginDesc<>(
+                SinkConnector.class,
                 regularVersion,
+                PluginType.SINK,
                 pluginLoader
         );
 
-        assertPluginDesc(connectorDesc, Connector.class, regularVersion, 
pluginLoader.location());
+        assertPluginDesc(connectorDesc, SinkConnector.class, regularVersion, 
PluginType.SINK, pluginLoader.location());
 
         PluginDesc<Converter> converterDesc = new PluginDesc<>(
                 Converter.class,
                 snapshotVersion,
+                PluginType.CONVERTER,
                 pluginLoader
         );
 
-        assertPluginDesc(converterDesc, Converter.class, snapshotVersion, 
pluginLoader.location());
+        assertPluginDesc(converterDesc, Converter.class, snapshotVersion, 
PluginType.CONVERTER, pluginLoader.location());
 
         PluginDesc<Transformation> transformDesc = new PluginDesc<>(
                 Transformation.class,
                 noVersion,
+                PluginType.TRANSFORMATION,
                 pluginLoader
         );
 
-        assertPluginDesc(transformDesc, Transformation.class, noVersion, 
pluginLoader.location());
+        assertPluginDesc(transformDesc, Transformation.class, noVersion, 
PluginType.TRANSFORMATION, pluginLoader.location());
     }
 
     @SuppressWarnings("rawtypes")
@@ -84,26 +96,29 @@ public class PluginDescTest {
         PluginDesc<SinkConnector> connectorDesc = new PluginDesc<>(
                 SinkConnector.class,
                 regularVersion,
+                PluginType.SINK,
                 systemLoader
         );
 
-        assertPluginDesc(connectorDesc, SinkConnector.class, regularVersion, 
location);
+        assertPluginDesc(connectorDesc, SinkConnector.class, regularVersion, 
PluginType.SINK, location);
 
         PluginDesc<Converter> converterDesc = new PluginDesc<>(
                 Converter.class,
                 snapshotVersion,
+                PluginType.CONVERTER,
                 systemLoader
         );
 
-        assertPluginDesc(converterDesc, Converter.class, snapshotVersion, 
location);
+        assertPluginDesc(converterDesc, Converter.class, snapshotVersion, 
PluginType.CONVERTER, location);
 
         PluginDesc<Transformation> transformDesc = new PluginDesc<>(
                 Transformation.class,
                 noVersion,
+                PluginType.TRANSFORMATION,
                 systemLoader
         );
 
-        assertPluginDesc(transformDesc, Transformation.class, noVersion, 
location);
+        assertPluginDesc(transformDesc, Transformation.class, noVersion, 
PluginType.TRANSFORMATION, location);
     }
 
     @Test
@@ -112,6 +127,7 @@ public class PluginDescTest {
         PluginDesc<SourceConnector> connectorDesc = new PluginDesc<>(
                 SourceConnector.class,
                 null,
+                PluginType.SOURCE,
                 pluginLoader
         );
 
@@ -119,6 +135,7 @@ public class PluginDescTest {
                 connectorDesc,
                 SourceConnector.class,
                 nullVersion,
+                PluginType.SOURCE,
                 pluginLoader.location()
         );
 
@@ -126,24 +143,27 @@ public class PluginDescTest {
         PluginDesc<Converter> converterDesc = new PluginDesc<>(
                 Converter.class,
                 null,
+                PluginType.CONVERTER,
                 systemLoader
         );
 
-        assertPluginDesc(converterDesc, Converter.class, nullVersion, 
location);
+        assertPluginDesc(converterDesc, Converter.class, nullVersion, 
PluginType.CONVERTER, location);
     }
 
     @SuppressWarnings("rawtypes")
     @Test
     public void testPluginDescEquality() {
-        PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>(
-                Connector.class,
+        PluginDesc<SinkConnector> connectorDescPluginPath = new PluginDesc<>(
+                SinkConnector.class,
                 snapshotVersion,
+                PluginType.SINK,
                 pluginLoader
         );
 
-        PluginDesc<Connector> connectorDescClasspath = new PluginDesc<>(
-                Connector.class,
+        PluginDesc<SinkConnector> connectorDescClasspath = new PluginDesc<>(
+                SinkConnector.class,
                 snapshotVersion,
+                PluginType.SINK,
                 systemLoader
         );
 
@@ -153,12 +173,14 @@ public class PluginDescTest {
         PluginDesc<Converter> converterDescPluginPath = new PluginDesc<>(
                 Converter.class,
                 noVersion,
+                PluginType.CONVERTER,
                 pluginLoader
         );
 
         PluginDesc<Converter> converterDescClasspath = new PluginDesc<>(
                 Converter.class,
                 noVersion,
+                PluginType.CONVERTER,
                 systemLoader
         );
 
@@ -168,30 +190,34 @@ public class PluginDescTest {
         PluginDesc<Transformation> transformDescPluginPath = new PluginDesc<>(
                 Transformation.class,
                 null,
+                PluginType.TRANSFORMATION,
                 pluginLoader
         );
 
         PluginDesc<Transformation> transformDescClasspath = new PluginDesc<>(
                 Transformation.class,
                 noVersion,
+                PluginType.TRANSFORMATION,
                 pluginLoader
         );
 
         assertNotEquals(transformDescPluginPath, transformDescClasspath);
     }
 
-    @SuppressWarnings("rawtypes")
+    @SuppressWarnings({"rawtypes", "unchecked"})
     @Test
     public void testPluginDescComparison() {
-        PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>(
-                Connector.class,
+        PluginDesc<SinkConnector> connectorDescPluginPath = new PluginDesc<>(
+                SinkConnector.class,
                 regularVersion,
+                PluginType.SINK,
                 pluginLoader
         );
 
-        PluginDesc<Connector> connectorDescClasspath = new PluginDesc<>(
-                Connector.class,
+        PluginDesc<SinkConnector> connectorDescClasspath = new PluginDesc<>(
+                SinkConnector.class,
                 newerVersion,
+                PluginType.SINK,
                 systemLoader
         );
 
@@ -200,12 +226,14 @@ public class PluginDescTest {
         PluginDesc<Converter> converterDescPluginPath = new PluginDesc<>(
                 Converter.class,
                 noVersion,
+                PluginType.CONVERTER,
                 pluginLoader
         );
 
         PluginDesc<Converter> converterDescClasspath = new PluginDesc<>(
                 Converter.class,
                 snapshotVersion,
+                PluginType.CONVERTER,
                 systemLoader
         );
 
@@ -214,12 +242,14 @@ public class PluginDescTest {
         PluginDesc<Transformation> transformDescPluginPath = new PluginDesc<>(
                 Transformation.class,
                 null,
+                PluginType.TRANSFORMATION,
                 pluginLoader
         );
 
         PluginDesc<Transformation> transformDescClasspath = new PluginDesc<>(
                 Transformation.class,
                 regularVersion,
+                PluginType.TRANSFORMATION,
                 systemLoader
         );
 
@@ -228,33 +258,87 @@ public class PluginDescTest {
         PluginDesc<Predicate> predicateDescPluginPath = new PluginDesc<>(
                 Predicate.class,
                 regularVersion,
+                PluginType.PREDICATE,
                 pluginLoader
         );
 
         PluginDesc<Predicate> predicateDescClasspath = new PluginDesc<>(
                 Predicate.class,
                 regularVersion,
+                PluginType.PREDICATE,
                 systemLoader
         );
 
         assertNewer(predicateDescPluginPath, predicateDescClasspath);
+
+        PluginDesc<ConfigProvider> configProviderDescPluginPath = new 
PluginDesc<>(
+                FileConfigProvider.class,
+                regularVersion,
+                PluginType.CONFIGPROVIDER,
+                pluginLoader
+        );
+
+        PluginDesc<ConfigProvider> configProviderDescOtherPluginLoader = new 
PluginDesc<>(
+                FileConfigProvider.class,
+                regularVersion,
+                PluginType.CONFIGPROVIDER,
+                otherPluginLoader
+        );
+
+        assertTrue("Different plugin loaders should have an ordering",
+                
configProviderDescPluginPath.compareTo(configProviderDescOtherPluginLoader) != 
0);
+
+
+        PluginDesc<Converter> jsonConverterPlugin = new PluginDesc<>(
+                JsonConverter.class,
+                regularVersion,
+                PluginType.CONVERTER,
+                systemLoader
+        );
+
+        PluginDesc<HeaderConverter> jsonHeaderConverterPlugin = new 
PluginDesc<>(
+                JsonConverter.class,
+                regularVersion,
+                PluginType.HEADER_CONVERTER,
+                systemLoader
+        );
+
+        assertNewer(jsonConverterPlugin, jsonHeaderConverterPlugin);
+    }
+
+    @Test
+    public void testNullArguments() {
+        // Null version is acceptable
+        PluginDesc<SinkConnector> sink = new PluginDesc<>(SinkConnector.class, 
null, PluginType.SINK, systemLoader);
+        assertEquals("null", sink.version());
+
+        // Direct nulls are not acceptable for other arguments
+        assertThrows(NullPointerException.class, () -> new PluginDesc<>(null, 
regularVersion, PluginType.SINK, systemLoader));
+        assertThrows(NullPointerException.class, () -> new 
PluginDesc<>(SinkConnector.class, regularVersion, null, systemLoader));
+        assertThrows(NullPointerException.class, () -> new 
PluginDesc<>(SinkConnector.class, regularVersion, PluginType.SINK, null));
+
+        // PluginClassLoaders must have non-null locations
+        PluginClassLoader nullLocationLoader = mock(PluginClassLoader.class);
+        when(nullLocationLoader.location()).thenReturn(null);
+        assertThrows(NullPointerException.class, () -> new 
PluginDesc<>(SinkConnector.class, regularVersion, PluginType.SINK, 
nullLocationLoader));
     }
 
     private static <T> void assertPluginDesc(
             PluginDesc<T> desc,
             Class<? extends T> klass,
             String version,
+            PluginType type,
             String location
     ) {
         assertEquals(desc.pluginClass(), klass);
         assertEquals(desc.className(), klass.getName());
         assertEquals(desc.version(), version);
-        assertEquals(desc.type(), PluginType.from(klass));
-        assertEquals(desc.typeName(), PluginType.from(klass).toString());
+        assertEquals(desc.type(), type);
+        assertEquals(desc.typeName(), type.toString());
         assertEquals(desc.location(), location);
     }
 
-    private static <T> void assertNewer(PluginDesc<T> older, PluginDesc<T> 
newer) {
+    private static void assertNewer(PluginDesc<?> older, PluginDesc<?> newer) {
         assertTrue(newer + " should be newer than " + older, 
older.compareTo(newer) < 0);
     }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
index b9b80fa5110..d6a85b294cf 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
@@ -70,7 +70,7 @@ public class PluginScannerTest {
     @Test
     public void testScanningEmptyPluginPath() {
         PluginScanResult result = scan(
-                Collections.emptyList()
+                Collections.emptySet()
         );
         assertTrue(result.isEmpty());
     }
@@ -91,7 +91,7 @@ public class PluginScannerTest {
         pluginDir.newFile("invalid.jar");
 
         PluginScanResult result = scan(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
+                
Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath())
         );
         assertTrue(result.isEmpty());
     }
@@ -102,7 +102,7 @@ public class PluginScannerTest {
         pluginDir.newFile("my-plugin/invalid.jar");
 
         PluginScanResult result = scan(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
+                
Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath())
         );
         assertTrue(result.isEmpty());
     }
@@ -110,7 +110,7 @@ public class PluginScannerTest {
     @Test
     public void testScanningNoPlugins() {
         PluginScanResult result = scan(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
+                
Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath())
         );
         assertTrue(result.isEmpty());
     }
@@ -120,7 +120,7 @@ public class PluginScannerTest {
         pluginDir.newFolder("my-plugin");
 
         PluginScanResult result = scan(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
+                
Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath())
         );
         assertTrue(result.isEmpty());
     }
@@ -137,7 +137,7 @@ public class PluginScannerTest {
         }
 
         PluginScanResult result = scan(
-                
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
+                
Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath())
         );
         Set<String> classes = new HashSet<>();
         result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
@@ -145,7 +145,7 @@ public class PluginScannerTest {
         assertEquals(expectedClasses, classes);
     }
 
-    private PluginScanResult scan(List<Path> pluginLocations) {
+    private PluginScanResult scan(Set<Path> pluginLocations) {
         ClassLoaderFactory factory = new ClassLoaderFactory();
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, 
PluginScannerTest.class.getClassLoader(), factory);
         return scanner.discoverPlugins(pluginSources);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index 0f830c8a8aa..4dd168a3faf 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -508,11 +508,11 @@ public class PluginUtilsTest {
     @Test
     public void testNonCollidingAliases() {
         SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
-        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
PluginType.SINK, MockSinkConnector.class.getClassLoader()));
         SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new 
TreeSet<>();
-        sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, 
MockSourceConnector.class.getClassLoader()));
+        sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, 
PluginType.SOURCE, MockSourceConnector.class.getClassLoader()));
         SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
-        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
PluginType.CONVERTER, CollidingConverter.class.getClassLoader()));
         PluginScanResult result = new PluginScanResult(
                 sinkConnectors,
                 sourceConnectors,
@@ -540,8 +540,8 @@ public class PluginUtilsTest {
     public void testMultiVersionAlias() {
         SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
         // distinct versions don't cause an alias collision (the class name is 
the same)
-        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
-        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", 
MockSinkConnector.class.getClassLoader()));
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
PluginType.SINK, MockSinkConnector.class.getClassLoader()));
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", 
PluginType.SINK, MockSinkConnector.class.getClassLoader()));
         assertEquals(2, sinkConnectors.size());
         PluginScanResult result = new PluginScanResult(
                 sinkConnectors,
@@ -564,9 +564,9 @@ public class PluginUtilsTest {
     @Test
     public void testCollidingPrunedAlias() {
         SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
-        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
PluginType.CONVERTER, CollidingConverter.class.getClassLoader()));
         SortedSet<PluginDesc<HeaderConverter>> headerConverters = new 
TreeSet<>();
-        headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, 
null, CollidingHeaderConverter.class.getClassLoader()));
+        headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, 
null, PluginType.HEADER_CONVERTER, 
CollidingHeaderConverter.class.getClassLoader()));
         PluginScanResult result = new PluginScanResult(
                 Collections.emptySortedSet(),
                 Collections.emptySortedSet(),
@@ -589,9 +589,9 @@ public class PluginUtilsTest {
     @Test
     public void testCollidingSimpleAlias() {
         SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
-        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
PluginType.CONVERTER, CollidingConverter.class.getClassLoader()));
         SortedSet<PluginDesc<Transformation<?>>> transformations = new 
TreeSet<>();
-        transformations.add(new PluginDesc<>((Class<? extends 
Transformation<?>>) (Class<?>) Colliding.class, null, 
Colliding.class.getClassLoader()));
+        transformations.add(new PluginDesc<>((Class<? extends 
Transformation<?>>) (Class<?>) Colliding.class, null, 
PluginType.TRANSFORMATION, Colliding.class.getClassLoader()));
         PluginScanResult result = new PluginScanResult(
                 Collections.emptySortedSet(),
                 Collections.emptySortedSet(),
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
index bd87dd41516..d93fef181aa 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Predicate;
 import java.util.jar.Attributes;
 import java.util.jar.JarEntry;
@@ -248,7 +249,7 @@ public class TestPlugins {
      * @return A list of plugin jar filenames
      * @throws AssertionError if any plugin failed to load, or no plugins were 
loaded.
      */
-    public static List<Path> pluginPath() {
+    public static Set<Path> pluginPath() {
         return pluginPath(defaultPlugins());
     }
 
@@ -262,14 +263,14 @@ public class TestPlugins {
      * @return A list of plugin jar filenames containing the specified test 
plugins
      * @throws AssertionError if any plugin failed to load, or no plugins were 
loaded.
      */
-    public static List<Path> pluginPath(TestPlugin... plugins) {
+    public static Set<Path> pluginPath(TestPlugin... plugins) {
         assertAvailable();
         return Arrays.stream(plugins)
                 .filter(Objects::nonNull)
                 .map(TestPlugin::resourceDir)
                 .distinct()
                 .map(PLUGIN_JARS::get)
-                .collect(Collectors.toList());
+                .collect(Collectors.toSet());
     }
 
     public static String pluginPathJoined(TestPlugin... plugins) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 57149ca498d..9e5dc55743a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.config.ConfigDef.Recommender;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigDef.Width;
 import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.converters.LongConverter;
@@ -33,11 +34,9 @@ import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.SampleSinkConnector;
 import org.apache.kafka.connect.runtime.SampleSourceConnector;
 import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
-import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.PluginType;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
-import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@@ -61,7 +60,6 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import javax.ws.rs.BadRequestException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -109,35 +107,38 @@ public class ConnectorPluginsResourceTest {
     private static final ConfigInfos PARTIAL_CONFIG_INFOS;
     private static final int ERROR_COUNT = 0;
     private static final int PARTIAL_CONFIG_ERROR_COUNT = 1;
-    private static final Set<MockConnectorPluginDesc<?>> 
SINK_CONNECTOR_PLUGINS = new TreeSet<>();
-    private static final Set<MockConnectorPluginDesc<?>> 
SOURCE_CONNECTOR_PLUGINS = new TreeSet<>();
-    private static final Set<MockConnectorPluginDesc<?>> CONVERTER_PLUGINS = 
new TreeSet<>();
-    private static final Set<MockConnectorPluginDesc<?>> 
HEADER_CONVERTER_PLUGINS = new TreeSet<>();
-    private static final Set<MockConnectorPluginDesc<?>> 
TRANSFORMATION_PLUGINS = new TreeSet<>();
-    private static final Set<MockConnectorPluginDesc<?>> PREDICATE_PLUGINS = 
new TreeSet<>();
+    private static final Set<PluginDesc<?>> SINK_CONNECTOR_PLUGINS = new 
TreeSet<>();
+    private static final Set<PluginDesc<?>> SOURCE_CONNECTOR_PLUGINS = new 
TreeSet<>();
+    private static final Set<PluginDesc<?>> CONVERTER_PLUGINS = new 
TreeSet<>();
+    private static final Set<PluginDesc<?>> HEADER_CONVERTER_PLUGINS = new 
TreeSet<>();
+    private static final Set<PluginDesc<?>> TRANSFORMATION_PLUGINS = new 
TreeSet<>();
+    private static final Set<PluginDesc<?>> PREDICATE_PLUGINS = new 
TreeSet<>();
 
     static {
         try {
-            SINK_CONNECTOR_PLUGINS.add(new 
MockConnectorPluginDesc<>(VerifiableSinkConnector.class));
-            SINK_CONNECTOR_PLUGINS.add(new 
MockConnectorPluginDesc<>(MockSinkConnector.class));
+            ClassLoader classLoader = 
ConnectorPluginsResourceTest.class.getClassLoader();
+            String appVersion = AppInfoParser.getVersion();
+            SINK_CONNECTOR_PLUGINS.add(new 
PluginDesc<>(VerifiableSinkConnector.class, appVersion, PluginType.SINK, 
classLoader));
+            SINK_CONNECTOR_PLUGINS.add(new 
PluginDesc<>(MockSinkConnector.class, appVersion, PluginType.SINK, 
classLoader));
 
-            SOURCE_CONNECTOR_PLUGINS.add(new 
MockConnectorPluginDesc<>(VerifiableSourceConnector.class));
-            SOURCE_CONNECTOR_PLUGINS.add(new 
MockConnectorPluginDesc<>(MockSourceConnector.class));
-            SOURCE_CONNECTOR_PLUGINS.add(new 
MockConnectorPluginDesc<>(SchemaSourceConnector.class));
-            SOURCE_CONNECTOR_PLUGINS.add(new 
MockConnectorPluginDesc<>(ConnectorPluginsResourceTestConnector.class));
+            SOURCE_CONNECTOR_PLUGINS.add(new 
PluginDesc<>(VerifiableSourceConnector.class, appVersion, PluginType.SOURCE, 
classLoader));
+            SOURCE_CONNECTOR_PLUGINS.add(new 
PluginDesc<>(MockSourceConnector.class, appVersion, PluginType.SOURCE, 
classLoader));
+            SOURCE_CONNECTOR_PLUGINS.add(new 
PluginDesc<>(SchemaSourceConnector.class, appVersion, PluginType.SOURCE, 
classLoader));
+            SOURCE_CONNECTOR_PLUGINS.add(new 
PluginDesc<>(ConnectorPluginsResourceTestConnector.class, appVersion, 
PluginType.SOURCE, classLoader));
 
-            CONVERTER_PLUGINS.add(new 
MockConnectorPluginDesc<>(StringConverter.class));
-            CONVERTER_PLUGINS.add(new 
MockConnectorPluginDesc<>(LongConverter.class));
+            CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, 
PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER, classLoader));
+            CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, 
PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER, classLoader));
 
-            HEADER_CONVERTER_PLUGINS.add(new 
MockConnectorPluginDesc<>(StringConverter.class));
-            HEADER_CONVERTER_PLUGINS.add(new 
MockConnectorPluginDesc<>(LongConverter.class));
+            HEADER_CONVERTER_PLUGINS.add(new 
PluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, 
PluginType.HEADER_CONVERTER, classLoader));
+            HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, 
PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER, classLoader));
 
-            TRANSFORMATION_PLUGINS.add(new 
MockConnectorPluginDesc<>(RegexRouter.class));
-            TRANSFORMATION_PLUGINS.add(new 
MockConnectorPluginDesc<>(TimestampConverter.Key.class));
+            TRANSFORMATION_PLUGINS.add(new PluginDesc<>(RegexRouter.class, 
PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION, classLoader));
+            TRANSFORMATION_PLUGINS.add(new 
PluginDesc<>(TimestampConverter.Key.class, PluginDesc.UNDEFINED_VERSION, 
PluginType.TRANSFORMATION, classLoader));
 
-            PREDICATE_PLUGINS.add(new 
MockConnectorPluginDesc<>(HasHeaderKey.class));
-            PREDICATE_PLUGINS.add(new 
MockConnectorPluginDesc<>(RecordIsTombstone.class));
+            PREDICATE_PLUGINS.add(new PluginDesc<>(HasHeaderKey.class, 
PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE, classLoader));
+            PREDICATE_PLUGINS.add(new PluginDesc<>(RecordIsTombstone.class, 
PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE, classLoader));
         } catch (Exception e) {
+            e.printStackTrace();
             fail("Failed setting up plugins");
         }
     }
@@ -344,7 +345,7 @@ public class ConnectorPluginsResourceTest {
         Set<PluginInfo> expectedConnectorPlugins = 
Stream.of(SINK_CONNECTOR_PLUGINS, SOURCE_CONNECTOR_PLUGINS)
                 .flatMap(Collection::stream)
                 .filter(p -> !excludes.contains(p.pluginClass()))
-                .map(ConnectorPluginsResourceTest::newInfo)
+                .map(PluginInfo::new)
                 .collect(Collectors.toSet());
         Set<PluginInfo> actualConnectorPlugins = new 
HashSet<>(connectorPluginsResource.listConnectorPlugins(true));
         assertEquals(expectedConnectorPlugins, actualConnectorPlugins);
@@ -353,8 +354,9 @@ public class ConnectorPluginsResourceTest {
 
     @Test
     public void testConnectorPluginsIncludesClassTypeAndVersionInformation() 
throws Exception {
-        PluginInfo sinkInfo = newInfo(SampleSinkConnector.class);
-        PluginInfo sourceInfo = newInfo(SampleSourceConnector.class);
+        ClassLoader classLoader = 
ConnectorPluginsResourceTest.class.getClassLoader();
+        PluginInfo sinkInfo = new PluginInfo(new 
PluginDesc<>(SampleSinkConnector.class, SampleSinkConnector.VERSION, 
PluginType.SINK, classLoader));
+        PluginInfo sourceInfo = new PluginInfo(new 
PluginDesc<>(SampleSourceConnector.class, SampleSourceConnector.VERSION, 
PluginType.SOURCE, classLoader));
         assertEquals(PluginType.SINK.toString(), sinkInfo.type());
         assertEquals(PluginType.SOURCE.toString(), sourceInfo.type());
         assertEquals(SampleSinkConnector.VERSION, sinkInfo.version());
@@ -399,7 +401,7 @@ public class ConnectorPluginsResourceTest {
                         PREDICATE_PLUGINS
                 ).flatMap(Collection::stream)
                 .filter(p -> !excludes.contains(p.pluginClass()))
-                .map(ConnectorPluginsResourceTest::newInfo)
+                .map(PluginInfo::new)
                 .collect(Collectors.toSet());
         Set<PluginInfo> actualConnectorPlugins = new 
HashSet<>(connectorPluginsResource.listConnectorPlugins(false));
         assertEquals(expectedConnectorPlugins, actualConnectorPlugins);
@@ -424,41 +426,6 @@ public class ConnectorPluginsResourceTest {
         }
     }
 
-    protected static PluginInfo newInfo(PluginDesc<?> pluginDesc) {
-        return new PluginInfo(new 
MockConnectorPluginDesc<>(pluginDesc.pluginClass(), pluginDesc.version()));
-    }
-
-    protected static PluginInfo newInfo(Class<?> klass)
-            throws Exception {
-        return new PluginInfo(new MockConnectorPluginDesc<>(klass));
-    }
-
-    public static class MockPluginClassLoader extends PluginClassLoader {
-
-        public MockPluginClassLoader(URL pluginLocation, URL[] urls) {
-            super(pluginLocation, urls);
-        }
-
-        @Override
-        public String location() {
-            return "/tmp/mockpath";
-        }
-    }
-
-    public static class MockConnectorPluginDesc<T> extends PluginDesc<T> {
-        public MockConnectorPluginDesc(Class<T> klass, String version) {
-            super(klass, version, new MockPluginClassLoader(null, new URL[0]));
-        }
-
-        public MockConnectorPluginDesc(Class<T> klass) throws Exception {
-            super(
-                    klass,
-                    ReflectionScanner.versionFor(klass),
-                    new MockPluginClassLoader(null, new URL[0])
-            );
-        }
-    }
-
     /* Name here needs to be unique as we are testing the aliasing mechanism */
     public static class ConnectorPluginsResourceTestConnector extends 
SourceConnector {
 
@@ -476,7 +443,7 @@ public class ConnectorPluginsResourceTest {
 
         @Override
         public String version() {
-            return "1.0";
+            return AppInfoParser.getVersion();
         }
 
         @Override

Reply via email to