This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new f0c02d94d4c KAFKA-19758: Preferably use the connector classloader when 
loading pl… (#20675)
f0c02d94d4c is described below

commit f0c02d94d4c0eed1f10b484cf19d1a31479dc17f
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Oct 20 19:29:49 2025 +0200

    KAFKA-19758: Preferably use the connector classloader when loading pl… 
(#20675)
    
    …ugins if it has the correct version
    
    Reviewers: Greg Harris <[email protected]>, Fiore Mario Vitale 
<[email protected]>, Snehashis Pal <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../kafka/connect/runtime/AbstractHerder.java      |   2 +-
 .../kafka/connect/runtime/ConnectorConfig.java     |   8 +-
 .../org/apache/kafka/connect/runtime/Worker.java   |   2 +-
 .../runtime/isolation/DelegatingClassLoader.java   | 100 ++++++------
 .../kafka/connect/runtime/isolation/Plugins.java   |  23 +--
 .../kafka/connect/runtime/AbstractHerderTest.java  |  11 +-
 .../apache/kafka/connect/runtime/WorkerTest.java   |  15 +-
 .../isolation/DelegatingClassLoaderTest.java       | 181 ++++++++++++++++++---
 .../runtime/isolation/MultiVersionTest.java        |   5 +-
 .../connect/runtime/isolation/PluginsTest.java     |   9 +-
 .../runtime/isolation/SynchronizationTest.java     |   9 +-
 .../runtime/standalone/StandaloneHerderTest.java   |   9 +-
 13 files changed, 250 insertions(+), 126 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7c02cf3adf9..6d5a60829c7 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -174,7 +174,7 @@
     <suppress checks="ClassFanOutComplexity"
               
files="(WorkerSink|WorkerSource|ErrorHandling)Task(|WithTopicCreation|Mockito)Test.java"/>
     <suppress checks="ClassFanOutComplexity"
-              files="DistributedHerderTest.java"/>
+              files="(AbstractHerderTest|DistributedHerderTest).java"/>
 
     <suppress checks="MethodLength"
               
files="(RequestResponse|WorkerSinkTask|WorkerSinkTaskMockito)Test.java"/>
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index bce67129388..75083a755f7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -843,7 +843,7 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
         try {
             connVersion = 
PluginUtils.connectorVersionRequirement(connectorProps.get(CONNECTOR_VERSION));
             connector = cachedConnectors.getConnector(connType, connVersion);
-            connectorLoader = plugins().pluginLoader(connType, connVersion);
+            connectorLoader = plugins().connectorLoader(connType, connVersion);
             log.info("Validating connector {}, version {}", connType, 
connector.version());
         } catch (VersionedPluginLoadingException e) {
             log.warn("Failed to load connector {} with version {}, skipping 
additional validations (connector, converters, transformations, client 
overrides) ",
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index efd421bd2e2..9d44cab9b7e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -416,7 +416,9 @@ public class ConnectorConfig extends AbstractConfig {
         try {
             VersionRange range = 
PluginUtils.connectorVersionRequirement(getString(versionConfig));
             VersionRange connectorRange = 
PluginUtils.connectorVersionRequirement(getString(CONNECTOR_VERSION));
-            return (T) plugins.newPlugin(getClass(classConfig).getName(), 
range, plugins.pluginLoader(getString(CONNECTOR_CLASS_CONFIG), connectorRange));
+            return (T) plugins.newPlugin(getClass(classConfig).getName(),
+                                         range,
+                                         
plugins.connectorLoader(getString(CONNECTOR_CLASS_CONFIG), connectorRange));
         } catch (Exception e) {
             throw new ConnectException(e);
         }
@@ -570,7 +572,7 @@ public class ConnectorConfig extends AbstractConfig {
         }
         try {
             VersionRange range = 
PluginUtils.connectorVersionRequirement(connectorVersion);
-            return plugins.pluginVersion(pluginName, 
plugins.pluginLoader(connectorClass, range), pluginType);
+            return plugins.pluginVersion(pluginName, 
plugins.connectorLoader(connectorClass, range), pluginType);
         } catch (InvalidVersionSpecificationException | 
VersionedPluginLoadingException e) {
             // these errors should be captured in other places, so we can 
ignore them here
             log.warn("Failed to determine default plugin version for {}", 
connectorClass, e);
@@ -740,7 +742,7 @@ public class ConnectorConfig extends AbstractConfig {
 
             T plugin;
             try {
-                plugin = (T) plugins.newPlugin(pluginClass, pluginVersion, 
plugins.pluginLoader(connectorClass, connectorVersionRange));
+                plugin = (T) plugins.newPlugin(pluginClass, pluginVersion, 
plugins.connectorLoader(connectorClass, connectorVersionRange));
             } catch (VersionedPluginLoadingException e) {
                 throw e;
             } catch (Exception e) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index a3e914d3f90..2021d63f320 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -1249,7 +1249,7 @@ public final class Worker {
         final String version = 
connProps.get(ConnectorConfig.CONNECTOR_VERSION);
 
         try {
-            return plugins.pluginLoader(klass, 
PluginUtils.connectorVersionRequirement(version));
+            return plugins.connectorLoader(klass, 
PluginUtils.connectorVersionRequirement(version));
         } catch (InvalidVersionSpecificationException  | 
VersionedPluginLoadingException e) {
             throw new ConnectException(
                     String.format("Failed to get class loader for connector 
%s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 9f3cd021750..efb25f9062e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -23,10 +23,9 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -77,10 +76,11 @@ public class DelegatingClassLoader extends URLClassLoader {
      * Retrieve the PluginClassLoader associated with a plugin class
      *
      * @param name The fully qualified class name of the plugin
+     * @param range The version range of the plugin
+     * @param connectorLoader The ClassLoader of the connector loading this 
plugin
      * @return the PluginClassLoader that should be used to load this, or null 
if the plugin is not isolated.
      */
-    // VisibleForTesting
-    PluginClassLoader pluginClassLoader(String name, VersionRange range) {
+    PluginClassLoader pluginClassLoader(String name, VersionRange range, 
Optional<ClassLoader> connectorLoader) {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
             return null;
         }
@@ -90,20 +90,15 @@ public class DelegatingClassLoader extends URLClassLoader {
             return null;
         }
 
-
-        ClassLoader pluginLoader = findPluginLoader(inner, name, range);
+        ClassLoader pluginLoader = findPluginLoader(inner, name, range, 
connectorLoader);
         return pluginLoader instanceof PluginClassLoader
             ? (PluginClassLoader) pluginLoader
             : null;
     }
 
-    PluginClassLoader pluginClassLoader(String name) {
-        return pluginClassLoader(name, null);
-    }
-
-    ClassLoader loader(String classOrAlias, VersionRange range) {
+    ClassLoader connectorLoader(String classOrAlias, VersionRange range) {
         String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
-        ClassLoader classLoader = pluginClassLoader(fullName, range);
+        ClassLoader classLoader = pluginClassLoader(fullName, range, 
Optional.empty());
         if (classLoader == null) {
             classLoader = this;
         }
@@ -115,12 +110,22 @@ public class DelegatingClassLoader extends URLClassLoader 
{
         return classLoader;
     }
 
-    ClassLoader loader(String classOrAlias) {
-        return loader(classOrAlias, null);
+    ClassLoader pluginLoader(String classOrAlias, VersionRange range, 
ClassLoader connectorLoader) {
+        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+        ClassLoader classLoader = pluginClassLoader(fullName, range, 
Optional.ofNullable(connectorLoader));
+        if (classLoader == null) {
+            classLoader = this;
+        }
+        log.debug(
+                "Got plugin class loader: '{}' for plugin: {}",
+                classLoader,
+                classOrAlias
+        );
+        return classLoader;
     }
 
     ClassLoader connectorLoader(String connectorClassOrAlias) {
-        return loader(connectorClassOrAlias);
+        return connectorLoader(connectorClassOrAlias, null);
     }
 
     String resolveFullClassName(String classOrAlias) {
@@ -152,42 +157,44 @@ public class DelegatingClassLoader extends URLClassLoader 
{
     private ClassLoader findPluginLoader(
         SortedMap<PluginDesc<?>, ClassLoader> loaders,
         String pluginName,
-        VersionRange range
+        VersionRange range,
+        Optional<ClassLoader> connectorLoader
     ) {
 
-        if (range != null) {
-
-            if (null != range.getRecommendedVersion()) {
-                throw new VersionedPluginLoadingException(String.format("A 
soft version range is not supported for plugin loading, "
-                        + "this is an internal error as connect should 
automatically convert soft ranges to hard ranges. "
-                        + "Provided soft version: %s ", range));
-            }
+        if (range != null && range.getRecommendedVersion() != null) {
+            throw new VersionedPluginLoadingException(String.format("A soft 
version range is not supported for plugin loading, "
+                    + "this is an internal error as connect should 
automatically convert soft ranges to hard ranges. "
+                    + "Provided soft version: %s ", range));
+        }
 
-            ClassLoader loader = null;
-            for (Map.Entry<PluginDesc<?>, ClassLoader> entry : 
loaders.entrySet()) {
-                // the entries should be in sorted order of versions so this 
should end up picking the latest version which matches the range
-                if (range.containsVersion(entry.getKey().encodedVersion())) {
-                    loader = entry.getValue();
-                }
+        ClassLoader loader = null;
+        // the entries should be in sorted order of versions so this should 
end up picking the latest version which matches the range
+        for (Map.Entry<PluginDesc<?>, ClassLoader> entry : loaders.entrySet()) 
{
+            if (range == null || 
range.containsVersion(entry.getKey().encodedVersion())) {
+                loader = entry.getValue();
             }
-
-            if (loader == null) {
-                List<String> availableVersions = 
loaders.keySet().stream().map(PluginDesc::version).collect(Collectors.toList());
-                throw new VersionedPluginLoadingException(String.format(
-                        "Plugin %s not found that matches the version range 
%s, available versions: %s",
-                        pluginName,
-                        range,
-                        availableVersions
-                ), availableVersions);
+            // if we find a plugin with the same loader as the connector, we 
can end our search
+            if (connectorLoader.isPresent() && 
connectorLoader.get().equals(loader)) {
+                break;
             }
-            return loader;
         }
 
-        return loaders.get(loaders.lastKey());
+        if (range != null && loader == null) {
+            List<String> availableVersions = 
loaders.keySet().stream().map(PluginDesc::version).collect(Collectors.toList());
+            throw new VersionedPluginLoadingException(String.format(
+                    "Plugin %s not found that matches the version range %s, 
available versions: %s",
+                    pluginName,
+                    range,
+                    availableVersions
+            ), availableVersions);
+        }
+        return loader;
     }
 
     public void installDiscoveredPlugins(PluginScanResult scanResult) {
-        pluginLoaders.putAll(computePluginLoaders(scanResult));
+        scanResult.forEach(pluginDesc ->
+            pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new 
TreeMap<>())
+                    .put(pluginDesc, pluginDesc.loader()));
         for (String pluginClassName : pluginLoaders.keySet()) {
             log.info("Added plugin '{}'", pluginClassName);
         }
@@ -209,7 +216,7 @@ public class DelegatingClassLoader extends URLClassLoader {
     ) throws VersionedPluginLoadingException, ClassNotFoundException {
 
         String fullName = aliases.getOrDefault(name, name);
-        PluginClassLoader pluginLoader = pluginClassLoader(fullName, range);
+        PluginClassLoader pluginLoader = pluginClassLoader(fullName, range, 
Optional.empty());
         Class<?> plugin;
         if (pluginLoader != null) {
             log.trace("Retrieving loaded class '{}' from '{}'", name, 
pluginLoader);
@@ -259,16 +266,9 @@ public class DelegatingClassLoader extends URLClassLoader {
                         fullName,
                         pluginVersion,
                         range
-                ), Collections.singletonList(pluginVersion));
+                ), List.of(pluginVersion));
             }
         }
     }
 
-    private static Map<String, SortedMap<PluginDesc<?>, ClassLoader>> 
computePluginLoaders(PluginScanResult plugins) {
-        Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders = new 
HashMap<>();
-        plugins.forEach(pluginDesc ->
-            pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new 
TreeMap<>())
-                .put(pluginDesc, pluginDesc.loader()));
-        return pluginLoaders;
-    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 97094bc89c8..f790ef59da2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -282,20 +282,14 @@ public class Plugins {
         return delegatingLoader;
     }
 
-    // kept for compatibility
-    public ClassLoader connectorLoader(String connectorClassOrAlias) {
-        return delegatingLoader.loader(connectorClassOrAlias);
+    public ClassLoader connectorLoader(String connectorClassOrAlias, 
VersionRange range) {
+        return delegatingLoader.connectorLoader(connectorClassOrAlias, range);
     }
 
-    public ClassLoader pluginLoader(String classOrAlias, VersionRange range) {
-        return delegatingLoader.loader(classOrAlias, range);
+    public ClassLoader pluginLoader(String classOrAlias, VersionRange range, 
ClassLoader connectorLoader) {
+        return delegatingLoader.pluginLoader(classOrAlias, range, 
connectorLoader);
     }
 
-    public ClassLoader pluginLoader(String classOrAlias) {
-        return delegatingLoader.loader(classOrAlias);
-    }
-
-
     @SuppressWarnings({"unchecked", "rawtypes"})
     public Set<PluginDesc<Connector>> connectors() {
         Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set) 
sinkConnectors());
@@ -366,19 +360,14 @@ public class Plugins {
         return plugins;
     }
 
-    public Object newPlugin(String classOrAlias) throws ClassNotFoundException 
{
-        Class<?> klass = pluginClass(delegatingLoader, classOrAlias, 
Object.class);
-        return newPlugin(klass);
-    }
-
     public Object newPlugin(String classOrAlias, VersionRange range) throws 
VersionedPluginLoadingException, ClassNotFoundException {
         Class<?> klass = pluginClass(delegatingLoader, classOrAlias, 
Object.class, range);
         return newPlugin(klass);
     }
 
     public Object newPlugin(String classOrAlias, VersionRange range, 
ClassLoader sourceLoader) throws ClassNotFoundException {
-        if (range == null && sourceLoader instanceof PluginClassLoader) {
-            return newPlugin(sourceLoader.loadClass(classOrAlias));
+        if (sourceLoader instanceof PluginClassLoader) {
+            return newPlugin(pluginLoader(classOrAlias, range, 
sourceLoader).loadClass(classOrAlias));
         }
         return newPlugin(classOrAlias, range);
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 81efeab6ca4..9bb4613bf3c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -572,7 +572,8 @@ public class AbstractHerderTest {
         AbstractHerder herder = createConfigValidationHerder(connectorClass, 
noneConnectorClientConfigOverridePolicy);
 
         // 2 transform aliases defined -> 2 plugin lookups
-        
Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
+        
Mockito.lenient().when(plugins.transformations()).thenReturn(Set.of(transformationPluginDesc()));
+        Mockito.lenient().when(plugins.connectorLoader(any(), 
any())).thenReturn(classLoader);
         
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(), 
null, classLoader)).thenReturn(new SampleTransformation());
 
         // Define 2 transformations. One has a class defined and so can get 
embedded configs, the other is missing
@@ -626,8 +627,10 @@ public class AbstractHerderTest {
         final Class<? extends Connector> connectorClass = 
SampleSourceConnector.class;
         AbstractHerder herder = createConfigValidationHerder(connectorClass, 
noneConnectorClientConfigOverridePolicy);
 
-        
Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
-        
Mockito.lenient().when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc()));
+        
Mockito.lenient().when(plugins.transformations()).thenReturn(Set.of(transformationPluginDesc()));
+        
Mockito.lenient().when(plugins.predicates()).thenReturn(Set.of(predicatePluginDesc()));
+        Mockito.lenient().when(plugins.connectorLoader(any(), 
any())).thenReturn(classLoader);
+        
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(), 
null, classLoader)).thenReturn(new SampleTransformation());
         
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(), 
null, classLoader)).thenReturn(new SampleTransformation());
         
Mockito.lenient().when(plugins.newPlugin(SamplePredicate.class.getName(), null, 
classLoader)).thenReturn(new SamplePredicate());
 
@@ -1343,7 +1346,7 @@ public class AbstractHerderTest {
         
when(workerConfig.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG)).thenReturn((Class)
 SimpleHeaderConverter.class);
         when(worker.config()).thenReturn(workerConfig);
         when(plugins.newConnector(anyString(), any())).thenReturn(connector);
-        when(plugins.pluginLoader(connectorClass, 
null)).thenReturn(classLoader);
+        when(plugins.connectorLoader(any(), any())).thenReturn(classLoader);
         when(plugins.withClassLoader(classLoader)).thenReturn(loaderSwap);
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 38bcab1b594..64b6757f27d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -391,7 +391,6 @@ public class WorkerTest {
         mockKafkaClusterId();
         mockGenericIsolation();
 
-        when(plugins.pluginLoader(nonConnectorClass, 
null)).thenReturn(pluginLoader);
         when(plugins.newConnector(nonConnectorClass, 
null)).thenThrow(exception);
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@@ -610,7 +609,7 @@ public class WorkerTest {
         mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
         mockExecutorFakeSubmit(WorkerTask.class);
 
-        Map<String, String> origProps = 
Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
+        Map<String, String> origProps = Map.of(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, executorService,
                 noneConnectorClientConfigOverridePolicy, null);
@@ -909,7 +908,6 @@ public class WorkerTest {
 
         mockKafkaClusterId();
         mockGenericIsolation();
-        when(plugins.pluginLoader(SampleSourceConnector.class.getName(), 
null)).thenReturn(pluginLoader);
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
         worker.herder = herder;
@@ -1894,7 +1892,6 @@ public class WorkerTest {
         mockKafkaClusterId();
         mockGenericIsolation();
         when(plugins.connectorClass(anyString(), any())).thenReturn((Class) 
sourceConnector.getClass());
-        when(plugins.pluginLoader(SampleSourceConnector.class.getName(), 
null)).thenReturn(pluginLoader);
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, executorService,
                 allConnectorClientConfigOverridePolicy, mockAdminConstructor);
@@ -2105,8 +2102,7 @@ public class WorkerTest {
 
         mockGenericIsolation();
         when(plugins.newConnector(anyString(), 
any())).thenReturn(sourceConnector);
-        when(plugins.pluginLoader(SampleSourceConnector.class.getName(), 
null)).thenReturn(pluginLoader);
-        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+        when(plugins.withClassLoader(any(), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
         when(sourceConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenThrow(new UnsupportedOperationException("This connector doesn't 
" +
                 "support altering of offsets"));
 
@@ -2815,6 +2811,7 @@ public class WorkerTest {
     private void testStartTaskWithTooManyTaskConfigs(boolean enforced) {
         SinkTask task = mock(TestSinkTask.class);
         mockKafkaClusterId();
+        when(plugins.connectorLoader(any(), any())).thenReturn(pluginLoader);
 
         Map<String, String> origProps = 
Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, 
TestSinkTask.class.getName());
 
@@ -3026,11 +3023,11 @@ public class WorkerTest {
     }
 
     private void mockGenericIsolation() {
+        when(plugins.connectorLoader(any(), any())).thenReturn(pluginLoader);
         when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
     }
 
     private void verifyGenericIsolation() {
-        verify(plugins, atLeastOnce()).withClassLoader(pluginLoader);
         verify(loaderSwap, atLeastOnce()).close();
     }
 
@@ -3042,7 +3039,6 @@ public class WorkerTest {
 
     private void mockVersionedConnectorIsolation(String connectorClass, 
VersionRange range, Connector connector) {
         mockGenericIsolation();
-        when(plugins.pluginLoader(connectorClass, 
range)).thenReturn(pluginLoader);
         when(plugins.newConnector(connectorClass, 
range)).thenReturn(connector);
         when(connector.version()).thenReturn(range == null ? "unknown" : 
range.toString());
     }
@@ -3055,7 +3051,6 @@ public class WorkerTest {
 
     private void verifyVersionedConnectorIsolation(String connectorClass, 
VersionRange range, Connector connector) {
         verifyGenericIsolation();
-        verify(plugins).pluginLoader(connectorClass, range);
         verify(plugins).newConnector(connectorClass, range);
         verify(connector, atLeastOnce()).version();
     }
@@ -3070,7 +3065,6 @@ public class WorkerTest {
     @SuppressWarnings({"unchecked", "rawtypes"})
     private void mockVersionedTaskIsolation(Class<? extends Connector> 
connectorClass, Class<? extends Task> taskClass, VersionRange range, Connector 
connector, Task task) {
         mockGenericIsolation();
-        when(plugins.pluginLoader(connectorClass.getName(), 
range)).thenReturn(pluginLoader);
         when(plugins.connectorClass(connectorClass.getName(), 
range)).thenReturn((Class) connectorClass);
         when(plugins.newTask(taskClass)).thenReturn(task);
         
when(plugins.safeLoaderSwapper()).thenReturn(TestPlugins.noOpLoaderSwap());
@@ -3086,7 +3080,6 @@ public class WorkerTest {
 
     private void verifyVersionedTaskIsolation(Class<? extends Connector> 
connectorClass, Class<? extends Task> taskClass, VersionRange range, Task task) 
{
         verifyGenericIsolation();
-        verify(plugins).pluginLoader(connectorClass.getName(), range);
         verify(plugins).connectorClass(connectorClass.getName(), range);
         verify(plugins).newTask(taskClass);
         verify(task, times(2)).version();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
index 749acb3e5b0..76bad03fb01 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
@@ -17,21 +17,27 @@
 package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.transforms.Cast;
+import org.apache.kafka.connect.transforms.Filter;
+import org.apache.kafka.connect.transforms.Transformation;
 
+import 
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.Collections;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -42,13 +48,30 @@ public class DelegatingClassLoaderTest {
 
     public PluginClassLoader parent;
     public PluginClassLoader pluginLoader;
+    public PluginClassLoader pluginLoader2;
+    public PluginClassLoader pluginLoader3;
+    public PluginClassLoader pluginLoader4;
     public DelegatingClassLoader classLoader;
-    public PluginDesc<SinkConnector> pluginDesc;
+    public PluginDesc<SinkConnector> connectorPluginDesc;
+    public PluginDesc<SinkConnector> connectorPluginDesc2;
+    public PluginDesc<Transformation<?>> cast;
+    public PluginDesc<Transformation<?>> castV1Loader2;
+    public PluginDesc<Transformation<?>> castV1Loader3;
+    public PluginDesc<Transformation<?>> castV2;
+    public PluginDesc<Transformation<?>> filter;
     public PluginScanResult scanResult;
+    public String version1 = "1.0";
+    public String version2 = "2.0";
+    public VersionRange range1;
+    public VersionRange range1And2;
+    public VersionRange range2;
+    public VersionRange range123;
 
     // Arbitrary values, their contents is not meaningful.
     public static final String ARBITRARY = "arbitrary";
-    public static final Class<?> ARBITRARY_CLASS = org.mockito.Mockito.class;
+    public static final Class<?> CONN = Mockito.class;
+    public static final Class<?> CAST = mock(Cast.class).getClass();
+    public static final Class<?> FILTER = mock(Filter.class).getClass();
     public static final URL ARBITRARY_URL;
 
     static {
@@ -61,27 +84,51 @@ public class DelegatingClassLoaderTest {
 
     @BeforeEach
     @SuppressWarnings({"unchecked"})
-    public void setUp() {
+    public void setUp() throws InvalidVersionSpecificationException {
+        range1 = VersionRange.createFromVersionSpec("[" + version1 + "]");
+        range1And2 = VersionRange.createFromVersionSpec("[" + version1 + "," + 
version2 + "]");
+        range2 = VersionRange.createFromVersionSpec("[" + version2 + "]");
+        range123 = VersionRange.createFromVersionSpec("[123]");
         parent = mock(PluginClassLoader.class);
         pluginLoader = mock(PluginClassLoader.class);
+        pluginLoader2 = mock(PluginClassLoader.class);
+        pluginLoader3 = mock(PluginClassLoader.class);
+        pluginLoader4 = mock(PluginClassLoader.class);
         classLoader = new DelegatingClassLoader(parent);
         SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
         // Lie to the DCL that this arbitrary class is a connector, since all 
real connector classes we have access to
         // are forced to be non-isolated by PluginUtils.shouldLoadInIsolation.
         when(pluginLoader.location()).thenReturn("some-location");
-        pluginDesc = new PluginDesc<>((Class<? extends SinkConnector>) 
ARBITRARY_CLASS, null, PluginType.SINK, pluginLoader);
-        assertTrue(PluginUtils.shouldLoadInIsolation(pluginDesc.className()));
-        sinkConnectors.add(pluginDesc);
+        when(pluginLoader2.location()).thenReturn("some-location2");
+        when(pluginLoader3.location()).thenReturn("some-location3");
+        when(pluginLoader4.location()).thenReturn("some-location4");
+        connectorPluginDesc = new PluginDesc<>((Class<? extends 
SinkConnector>) CONN, null, PluginType.SINK, pluginLoader);
+        connectorPluginDesc2 = new PluginDesc<>((Class<? extends 
SinkConnector>) CONN, version1, PluginType.SINK, pluginLoader2);
+        
assertTrue(PluginUtils.shouldLoadInIsolation(connectorPluginDesc.className()));
+        
assertTrue(PluginUtils.shouldLoadInIsolation(connectorPluginDesc2.className()));
+        sinkConnectors.add(connectorPluginDesc);
+        sinkConnectors.add(connectorPluginDesc2);
+        SortedSet<PluginDesc<Transformation<?>>> transformations = new 
TreeSet<>();
+        cast = new PluginDesc<>((Class<? extends Transformation<?>>) CAST, 
null, PluginType.TRANSFORMATION, pluginLoader);
+        castV1Loader2 = new PluginDesc<>((Class<? extends Transformation<?>>) 
CAST, version1, PluginType.TRANSFORMATION, pluginLoader2);
+        castV1Loader3 = new PluginDesc<>((Class<? extends Transformation<?>>) 
CAST, version1, PluginType.TRANSFORMATION, pluginLoader3);
+        castV2 = new PluginDesc<>((Class<? extends Transformation<?>>) CAST, 
version2, PluginType.TRANSFORMATION, pluginLoader4);
+        filter = new PluginDesc<>((Class<? extends Transformation<?>>) FILTER, 
null, PluginType.TRANSFORMATION, pluginLoader4);
+        transformations.add(cast);
+        transformations.add(castV1Loader2);
+        transformations.add(castV1Loader3);
+        transformations.add(castV2);
+        transformations.add(filter);
         scanResult = new PluginScanResult(
-                sinkConnectors,
-                Collections.emptySortedSet(),
-                Collections.emptySortedSet(),
-                Collections.emptySortedSet(),
-                Collections.emptySortedSet(),
-                Collections.emptySortedSet(),
-                Collections.emptySortedSet(),
-                Collections.emptySortedSet(),
-                Collections.emptySortedSet()
+            sinkConnectors,
+            new TreeSet<>(),
+            new TreeSet<>(),
+            new TreeSet<>(),
+            transformations,
+            new TreeSet<>(),
+            new TreeSet<>(),
+            new TreeSet<>(),
+            new TreeSet<>()
         );
     }
 
@@ -93,8 +140,8 @@ public class DelegatingClassLoaderTest {
     @Test
     @SuppressWarnings({"unchecked", "rawtypes"})
     public void testEmptyLoadClass() throws ClassNotFoundException {
-        when(parent.loadClass(ARBITRARY, false)).thenReturn((Class) 
ARBITRARY_CLASS);
-        assertSame(ARBITRARY_CLASS, classLoader.loadClass(ARBITRARY, false));
+        when(parent.loadClass(ARBITRARY, false)).thenReturn((Class) CAST);
+        assertSame(CAST, classLoader.loadClass(ARBITRARY, false));
     }
 
     @Test
@@ -106,17 +153,103 @@ public class DelegatingClassLoaderTest {
     @Test
     public void testInitializedConnectorLoader() {
         classLoader.installDiscoveredPlugins(scanResult);
-        assertSame(pluginLoader, 
classLoader.connectorLoader(PluginUtils.prunedName(pluginDesc)));
-        assertSame(pluginLoader, 
classLoader.connectorLoader(PluginUtils.simpleName(pluginDesc)));
-        assertSame(pluginLoader, 
classLoader.connectorLoader(pluginDesc.className()));
+        ClassLoader expectedLoader = 
scanResult.sinkConnectors().last().loader();
+        assertSame(expectedLoader, 
classLoader.connectorLoader(connectorPluginDesc.className()));
+    }
+
+    @Test
+    public void testInitializedConnectorLoaderWithVersion() {
+        classLoader.installDiscoveredPlugins(scanResult);
+        // connector v1 is only in pluginLoader2
+        assertSame(pluginLoader2, 
classLoader.connectorLoader(connectorPluginDesc.className(), range1));
+
+        // connector v123 cannot be found
+        assertThrows(VersionedPluginLoadingException.class, () -> 
classLoader.connectorLoader(cast.className(), range123));
+    }
+
+    @Test
+    public void testInitializedPluginLoader() {
+        classLoader.installDiscoveredPlugins(scanResult);
+        // without a loader or version, the last loader that has the plugin is 
picked
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), null, null));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), null, null));
+    }
+
+    @Test
+    public void testInitializedPluginLoaderWithClassLoader() {
+        classLoader.installDiscoveredPlugins(scanResult);
+        // when range is not provided return our classloader if it has the 
plugin
+        assertSame(pluginLoader, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), null, pluginLoader));
+        assertSame(pluginLoader, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), null, pluginLoader));
+        assertSame(pluginLoader3, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), null, pluginLoader3));
+        assertSame(pluginLoader3, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), null, pluginLoader3));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.prunedName(filter), null, pluginLoader4));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.simpleName(filter), null, pluginLoader4));
+
+        // when range is not provided return the classloader which has the 
plugin if it's no in our classloader
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.prunedName(filter), null, pluginLoader));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.simpleName(filter), null, pluginLoader));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.prunedName(filter), null, pluginLoader3));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.simpleName(filter), null, pluginLoader3));
+
+        // when range is not provided return the last classloader which has 
the plugin if it's no in our classloader
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), null, pluginLoader4));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), null, pluginLoader4));
+    }
+
+    @Test
+    public void testInitializedPluginLoaderWithVersion() {
+        classLoader.installDiscoveredPlugins(scanResult);
+        // cast v1 is in both pluginLoader2 and pluginLoader3, we prefer the 
specified loader
+        assertSame(pluginLoader2, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1, pluginLoader2));
+        assertSame(pluginLoader2, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1, pluginLoader2));
+        assertSame(pluginLoader3, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1, pluginLoader3));
+        assertSame(pluginLoader3, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1, pluginLoader3));
+
+        // cast v1 is in both pluginLoader2 and pluginLoader3, we prefer the 
last loader
+        assertSame(pluginLoader3, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1, pluginLoader));
+        assertSame(pluginLoader3, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1, pluginLoader));
+        assertSame(pluginLoader3, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1, pluginLoader4));
+        assertSame(pluginLoader3, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1, pluginLoader4));
+
+        // both cast v1 and v2 match the range, we prefer the specified loader
+        assertSame(pluginLoader2, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1And2, 
pluginLoader2));
+        assertSame(pluginLoader2, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1And2, 
pluginLoader2));
+        assertSame(pluginLoader3, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1And2, 
pluginLoader3));
+        assertSame(pluginLoader3, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1And2, 
pluginLoader3));
+
+        // both cast v1 and v2 match the range, we prefer the last loader
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1And2, 
pluginLoader));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1And2, 
pluginLoader));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1And2, 
pluginLoader4));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1And2, 
pluginLoader4));
+
+        // cast v2 is only in pluginLoader4
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range2, pluginLoader));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range2, pluginLoader));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range2, pluginLoader3));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range2, pluginLoader3));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range2, pluginLoader4));
+        assertSame(pluginLoader4, 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range2, pluginLoader4));
+
+        // cast v123 cannot be found
+        assertThrows(VersionedPluginLoadingException.class, () -> 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range123, pluginLoader));
+        assertThrows(VersionedPluginLoadingException.class, () -> 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range123, pluginLoader));
+        assertThrows(VersionedPluginLoadingException.class, () -> 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range123, 
pluginLoader2));
+        assertThrows(VersionedPluginLoadingException.class, () -> 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range123, 
pluginLoader2));
+        assertThrows(VersionedPluginLoadingException.class, () -> 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range123, 
pluginLoader3));
+        assertThrows(VersionedPluginLoadingException.class, () -> 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range123, 
pluginLoader3));
+        assertThrows(VersionedPluginLoadingException.class, () -> 
classLoader.pluginLoader(PluginUtils.prunedName(cast), range123, 
pluginLoader4));
+        assertThrows(VersionedPluginLoadingException.class, () -> 
classLoader.pluginLoader(PluginUtils.simpleName(cast), range123, 
pluginLoader4));
     }
 
     @Test
     @SuppressWarnings({"unchecked", "rawtypes"})
     public void testInitializedLoadClass() throws ClassNotFoundException {
         classLoader.installDiscoveredPlugins(scanResult);
-        String className = pluginDesc.className();
-        when(pluginLoader.loadClass(className, false)).thenReturn((Class) 
ARBITRARY_CLASS);
-        assertSame(ARBITRARY_CLASS, classLoader.loadClass(className, false));
+        String className = connectorPluginDesc.className();
+        // Use the last loader that has CONN
+        when(pluginLoader2.loadClass(className, false)).thenReturn((Class) 
CONN);
+        assertSame(CONN, classLoader.loadClass(className, false));
     }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
index b770de74b80..1894ec365a5 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
@@ -58,7 +58,7 @@ public class MultiVersionTest {
             String pluginLocation = entry.getKey().toAbsolutePath().toString();
 
             for (VersionedPluginBuilder.BuildInfo buildInfo : 
entry.getValue()) {
-                ClassLoader pluginLoader = 
plugins.pluginLoader(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+                ClassLoader pluginLoader = 
plugins.pluginLoader(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()), null);
                 Assertions.assertInstanceOf(PluginClassLoader.class, 
pluginLoader);
                 Assertions.assertTrue(((PluginClassLoader) 
pluginLoader).location().contains(pluginLocation));
                 Object p = plugins.newPlugin(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()));
@@ -167,7 +167,8 @@ public class MultiVersionTest {
         // get the connector loader of the combined artifact which includes 
all plugin types
         ClassLoader connectorLoader = plugins.pluginLoader(
             
VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className(),
-            PluginUtils.connectorVersionRequirement("0.1.0")
+            PluginUtils.connectorVersionRequirement("0.1.0"),
+            null
         );
         Assertions.assertInstanceOf(PluginClassLoader.class, connectorLoader);
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index ca4c29931d0..87f4ce2c3a2 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -61,6 +61,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.stream.Collectors;
@@ -355,7 +356,7 @@ public class PluginsTest {
     @Test
     public void newConverterShouldConfigureWithPluginClassLoader() {
         props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
TestPlugin.SAMPLING_CONVERTER.className());
-        ClassLoader classLoader = 
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONVERTER.className());
+        ClassLoader classLoader = 
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONVERTER.className(),
 null, Optional.empty());
         try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
             createConfig();
         }
@@ -377,7 +378,7 @@ public class PluginsTest {
         String providerPrefix = "some.provider";
         props.put(providerPrefix + ".class", 
TestPlugin.SAMPLING_CONFIG_PROVIDER.className());
 
-        PluginClassLoader classLoader = 
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONFIG_PROVIDER.className());
+        PluginClassLoader classLoader = 
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONFIG_PROVIDER.className(),
 null, Optional.empty());
         assertNotNull(classLoader);
         try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
             createConfig();
@@ -398,7 +399,7 @@ public class PluginsTest {
     @Test
     public void newHeaderConverterShouldConfigureWithPluginClassLoader() {
         props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, 
TestPlugin.SAMPLING_HEADER_CONVERTER.className());
-        ClassLoader classLoader = 
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_HEADER_CONVERTER.className());
+        ClassLoader classLoader = 
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_HEADER_CONVERTER.className(),
 null, Optional.empty());
         try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
             createConfig();
         }
@@ -590,7 +591,7 @@ public class PluginsTest {
 
     @Test
     public void testAliasesInConverters() throws ClassNotFoundException {
-        ClassLoader connectorLoader = 
plugins.connectorLoader(TestPlugin.SAMPLING_CONNECTOR.className());
+        ClassLoader connectorLoader = 
plugins.connectorLoader(TestPlugin.SAMPLING_CONNECTOR.className(), null);
         try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
             String configKey = "config.key";
             String alias = "SamplingConverter";
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index cf2e53f1846..198118d8677 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -46,6 +46,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -191,10 +192,10 @@ public class SynchronizationTest {
         }
 
         @Override
-        public PluginClassLoader pluginClassLoader(String name, VersionRange 
range) {
+        PluginClassLoader pluginClassLoader(String name, VersionRange range, 
Optional<ClassLoader> connectorLoader) {
             dclBreakpoint.await(name);
             dclBreakpoint.await(name);
-            return super.pluginClassLoader(name, range);
+            return super.pluginClassLoader(name, range, connectorLoader);
         }
     }
 
@@ -225,7 +226,7 @@ public class SynchronizationTest {
     public void testSimultaneousUpwardAndDownwardDelegating() throws Exception 
{
         String t1Class = TestPlugins.TestPlugin.SAMPLING_CONVERTER.className();
         // Grab a reference to the target PluginClassLoader before activating 
breakpoints
-        ClassLoader connectorLoader = plugins.connectorLoader(t1Class);
+        ClassLoader connectorLoader = plugins.connectorLoader(t1Class, null);
 
         // THREAD 1: loads a class by delegating downward starting from the 
DelegatingClassLoader
         // DelegatingClassLoader breakpoint will only trigger on this thread
@@ -305,7 +306,7 @@ public class SynchronizationTest {
     public void testPluginClassLoaderDoesntHoldMonitorLock()
         throws InterruptedException, TimeoutException, BrokenBarrierException {
         String t1Class = TestPlugins.TestPlugin.SAMPLING_CONVERTER.className();
-        ClassLoader connectorLoader = plugins.connectorLoader(t1Class);
+        ClassLoader connectorLoader = plugins.connectorLoader(t1Class, null);
 
         Object externalTestLock = new Object();
         Breakpoint<Object> testBreakpoint = new Breakpoint<>();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 76b25131cc5..8fa76d9dca9 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -197,8 +197,7 @@ public class StandaloneHerderTest {
         when(worker.config()).thenReturn(workerConfig);
         
when(workerConfig.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG)).thenReturn((Class)
 SimpleHeaderConverter.class);
         when(plugins.newConnector(anyString(), 
any())).thenReturn(connectorMock);
-        when(plugins.pluginLoader(anyString(), 
any())).thenReturn(pluginLoader);
-        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
+        when(plugins.withClassLoader(null)).thenReturn(loaderSwap);
 
         when(connectorMock.config()).thenReturn(new ConfigDef());
         ConfigValue validatedValue = new ConfigValue("foo.bar");
@@ -889,7 +888,8 @@ public class StandaloneHerderTest {
         when(worker.config()).thenReturn(workerConfig);
         when(worker.getPlugins()).thenReturn(plugins);
         
when(workerConfig.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG)).thenReturn((Class)
 SimpleHeaderConverter.class);
-        when(plugins.pluginLoader(anyString(), 
any())).thenReturn(pluginLoader);
+        //when(plugins.pluginLoader(anyString(), any(), 
any())).thenReturn(pluginLoader);
+        when(plugins.connectorLoader(any(), any())).thenReturn(pluginLoader);
         when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
         when(plugins.newConnector(anyString(), 
any())).thenReturn(connectorMock);
         when(connectorMock.config()).thenReturn(configDef);
@@ -1244,7 +1244,8 @@ public class StandaloneHerderTest {
         
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
         when(worker.config()).thenReturn(workerConfig);
         
when(workerConfig.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG)).thenReturn((Class)
 SimpleHeaderConverter.class);
-        when(plugins.pluginLoader(anyString(), 
any())).thenReturn(pluginLoader);
+        //when(plugins.pluginLoader(anyString(), any(), 
any())).thenReturn(pluginLoader);
+        when(plugins.connectorLoader(any(), any())).thenReturn(pluginLoader);
         when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
         // Assume the connector should always be created
         when(worker.getPlugins()).thenReturn(plugins);

Reply via email to