This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9dc9973c1ca KAFKA-18863: Connect Multiversion Support (Versioned 
Connector Creation and related changes) (#17743)
9dc9973c1ca is described below

commit 9dc9973c1ca53218449f311e21478848f2d1fa92
Author: snehashisp <[email protected]>
AuthorDate: Thu Feb 27 04:42:34 2025 +0530

    KAFKA-18863: Connect Multiversion Support (Versioned Connector Creation and 
related changes) (#17743)
    
    Reviewers: Greg Harris <[email protected]>
---
 .../connect/runtime/AbstractWorkerSourceTask.java  |  28 ++-
 .../kafka/connect/runtime/ConnectorConfig.java     |  26 ++-
 .../runtime/ExactlyOnceWorkerSourceTask.java       |   7 +-
 .../kafka/connect/runtime/TransformationStage.java |  25 ++-
 .../org/apache/kafka/connect/runtime/Worker.java   | 214 +++++++++++---------
 .../kafka/connect/runtime/WorkerSinkTask.java      |  27 ++-
 .../kafka/connect/runtime/WorkerSourceTask.java    |   7 +-
 .../apache/kafka/connect/runtime/WorkerTask.java   |   7 +-
 .../kafka/connect/runtime/isolation/Plugins.java   |   4 +-
 .../runtime/AbstractWorkerSourceTaskTest.java      |   3 +-
 .../kafka/connect/runtime/ConnectorConfigTest.java |   6 +-
 .../connect/runtime/ErrorHandlingTaskTest.java     |   9 +-
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java   |   3 +-
 .../connect/runtime/TransformationStageTest.java   |   5 +-
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  |   3 +-
 .../runtime/WorkerSinkTaskThreadedTest.java        |   3 +-
 .../connect/runtime/WorkerSourceTaskTest.java      |   3 +-
 .../kafka/connect/runtime/WorkerTaskTest.java      |   3 +-
 .../apache/kafka/connect/runtime/WorkerTest.java   | 215 ++++++++++++++-------
 .../kafka/connect/runtime/WorkerTestUtils.java     |   4 +-
 .../connect/runtime/isolation/PluginsTest.java     |  10 +-
 .../connect/runtime/isolation/TestPlugins.java     |   7 +
 .../kafka/connect/util/TopicCreationTest.java      |   4 +-
 23 files changed, 407 insertions(+), 216 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index d1675d5455a..683eb3abed0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -43,6 +43,7 @@ import 
org.apache.kafka.connect.runtime.errors.ProcessingContext;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.runtime.errors.ToleranceType;
+import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.source.SourceTaskContext;
@@ -70,6 +71,7 @@ import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
@@ -233,11 +235,12 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask<SourceRecord,
                                        
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator,
                                        StatusBackingStore statusBackingStore,
                                        Executor closeExecutor,
-                                       
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier) {
+                                       
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
+                                       Function<ClassLoader, LoaderSwap> 
pluginLoaderSwapper) {
 
         super(id, statusListener, initialState, loader, connectMetrics, 
errorMetrics,
                 retryWithToleranceOperator, transformationChain, 
errorReportersSupplier,
-                time, statusBackingStore);
+                time, statusBackingStore, pluginLoaderSwapper);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -491,11 +494,17 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask<SourceRecord,
 
         RecordHeaders headers = retryWithToleranceOperator.execute(context, () 
-> convertHeaderFor(record), Stage.HEADER_CONVERTER, 
headerConverterPlugin.get().getClass());
 
-        byte[] key = retryWithToleranceOperator.execute(context, () -> 
keyConverterPlugin.get().fromConnectData(record.topic(), headers, 
record.keySchema(), record.key()),
-                Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass());
+        byte[] key = retryWithToleranceOperator.execute(context, () -> {
+            try (LoaderSwap swap = 
pluginLoaderSwapper.apply(keyConverterPlugin.get().getClass().getClassLoader()))
 {
+                return 
keyConverterPlugin.get().fromConnectData(record.topic(), headers, 
record.keySchema(), record.key());
+            }
+        }, Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass());
 
-        byte[] value = retryWithToleranceOperator.execute(context, () -> 
valueConverterPlugin.get().fromConnectData(record.topic(), headers, 
record.valueSchema(), record.value()),
-                Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass());
+        byte[] value = retryWithToleranceOperator.execute(context, () -> {
+            try (LoaderSwap swap = 
pluginLoaderSwapper.apply(valueConverterPlugin.get().getClass().getClassLoader()))
 {
+                return 
valueConverterPlugin.get().fromConnectData(record.topic(), headers, 
record.valueSchema(), record.value());
+            }
+        }, Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass());
 
         if (context.failed()) {
             return null;
@@ -551,8 +560,11 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask<SourceRecord,
             String topic = record.topic();
             for (Header header : headers) {
                 String key = header.key();
-                byte[] rawHeader = 
headerConverterPlugin.get().fromConnectHeader(topic, key, header.schema(), 
header.value());
-                result.add(key, rawHeader);
+                try (LoaderSwap swap = 
pluginLoaderSwapper.apply(headerConverterPlugin.get().getClass().getClassLoader()))
 {
+                    byte[] rawHeader = 
headerConverterPlugin.get().fromConnectHeader(topic, key, header.schema(), 
header.value());
+                    result.add(key, rawHeader);
+                }
+
             }
         }
         return result;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 9ba82bdf617..db485146811 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -362,7 +362,7 @@ public class ConnectorConfig extends AbstractConfig {
      * {@link Transformation transformations} and {@link Predicate predicates}
      * as they are specified in the {@link #TRANSFORMS_CONFIG} and {@link 
#PREDICATES_CONFIG}
      */
-    public <R extends ConnectRecord<R>> List<TransformationStage<R>> 
transformationStages(ConnectorTaskId connectorTaskId, ConnectMetrics metrics) {
+    public <R extends ConnectRecord<R>> List<TransformationStage<R>> 
transformationStages(Plugins plugins, ConnectorTaskId connectorTaskId, 
ConnectMetrics metrics) {
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
 
         final List<TransformationStage<R>> transformations = new 
ArrayList<>(transformAliases.size());
@@ -370,8 +370,9 @@ public class ConnectorConfig extends AbstractConfig {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
             try {
-                @SuppressWarnings("unchecked")
-                final Transformation<R> transformation = 
Utils.newInstance(getClass(prefix + "type"), Transformation.class);
+                final String typeConfig = prefix + "type";
+                final String versionConfig = prefix + 
WorkerConfig.PLUGIN_VERSION_SUFFIX;
+                @SuppressWarnings("unchecked") final Transformation<R> 
transformation = getTransformationOrPredicate(plugins, typeConfig, 
versionConfig);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
                 Object predicateAlias = 
configs.remove(TransformationStage.PREDICATE_CONFIG);
                 Object negate = 
configs.remove(TransformationStage.NEGATE_CONFIG);
@@ -379,13 +380,15 @@ public class ConnectorConfig extends AbstractConfig {
                 Plugin<Transformation<R>> transformationPlugin = 
metrics.wrap(transformation, connectorTaskId, alias);
                 if (predicateAlias != null) {
                     String predicatePrefix = PREDICATES_PREFIX + 
predicateAlias + ".";
+                    final String predicateTypeConfig = predicatePrefix + 
"type";
+                    final String predicateVersionConfig = predicatePrefix + 
WorkerConfig.PLUGIN_VERSION_SUFFIX;
                     @SuppressWarnings("unchecked")
-                    Predicate<R> predicate = 
Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
+                    Predicate<R> predicate = 
getTransformationOrPredicate(plugins, predicateTypeConfig, 
predicateVersionConfig);
                     predicate.configure(originalsWithPrefix(predicatePrefix));
                     Plugin<Predicate<R>> predicatePlugin = 
metrics.wrap(predicate, connectorTaskId, (String) predicateAlias);
-                    transformations.add(new 
TransformationStage<>(predicatePlugin, negate != null && 
Boolean.parseBoolean(negate.toString()), transformationPlugin));
+                    transformations.add(new 
TransformationStage<>(predicatePlugin, negate != null && 
Boolean.parseBoolean(negate.toString()), transformationPlugin, 
plugins.safeLoaderSwapper()));
                 } else {
-                    transformations.add(new 
TransformationStage<>(transformationPlugin));
+                    transformations.add(new 
TransformationStage<>(transformationPlugin, plugins.safeLoaderSwapper()));
                 }
             } catch (Exception e) {
                 throw new ConnectException(e);
@@ -395,6 +398,17 @@ public class ConnectorConfig extends AbstractConfig {
         return transformations;
     }
 
+    @SuppressWarnings("unchecked")
+    private <T> T getTransformationOrPredicate(Plugins plugins, String 
classConfig, String versionConfig) {
+        try {
+            VersionRange range = 
PluginUtils.connectorVersionRequirement(getString(versionConfig));
+            VersionRange connectorRange = 
PluginUtils.connectorVersionRequirement(getString(CONNECTOR_VERSION));
+            return (T) plugins.newPlugin(getClass(classConfig).getName(), 
range, plugins.pluginLoader(getString(CONNECTOR_CLASS_CONFIG), connectorRange));
+        } catch (Exception e) {
+            throw new ConnectException(e);
+        }
+    }
+
     /**
      * Returns an enriched {@link ConfigDef} building upon the {@code 
ConfigDef}, using the current configuration specified in {@code props} as an 
input.
      * <p>
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 706a3e4b9d5..fafbdbbc3f4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -32,6 +32,7 @@ import 
org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
 import org.apache.kafka.connect.runtime.errors.ErrorReporter;
 import org.apache.kafka.connect.runtime.errors.ProcessingContext;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.source.SourceTask.TransactionBoundary;
@@ -57,6 +58,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 
@@ -101,11 +103,12 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
                                        Executor closeExecutor,
                                        Runnable preProducerCheck,
                                        Runnable postProducerCheck,
-                                       
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier) {
+                                       
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
+                                       Function<ClassLoader, LoaderSwap> 
pluginLoaderSwapper) {
         super(id, task, statusListener, initialState, configState, 
keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, 
transformationChain,
                 buildTransactionContext(sourceConfig),
                 producer, admin, topicGroups, offsetReader, offsetWriter, 
offsetStore, workerConfig, connectMetrics, errorMetrics,
-                loader, time, retryWithToleranceOperator, statusBackingStore, 
closeExecutor, errorReportersSupplier);
+                loader, time, retryWithToleranceOperator, statusBackingStore, 
closeExecutor, errorReportersSupplier, pluginLoaderSwapper);
 
         this.transactionOpen = false;
         this.committableRecords = new LinkedHashMap<>();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
index 90b3559866a..a86c4878ab3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
@@ -20,9 +20,12 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
+import java.util.function.Function;
+
 /**
  * Wrapper for a {@link Transformation} and corresponding optional {@link 
Predicate}
  * which applies the transformation when the {@link Predicate} is true (or 
false, according to {@code negate}).
@@ -36,15 +39,18 @@ public class TransformationStage<R extends 
ConnectRecord<R>> implements AutoClos
     private final Plugin<Predicate<R>> predicatePlugin;
     private final Plugin<Transformation<R>> transformationPlugin;
     private final boolean negate;
+    private final Function<ClassLoader, LoaderSwap> pluginLoaderSwapper;
+
 
-    TransformationStage(Plugin<Transformation<R>> transformationPlugin) {
-        this(null, false, transformationPlugin);
+    TransformationStage(Plugin<Transformation<R>> transformationPlugin, 
Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
+        this(null, false, transformationPlugin, pluginLoaderSwapper);
     }
 
-    TransformationStage(Plugin<Predicate<R>> predicatePlugin, boolean negate, 
Plugin<Transformation<R>> transformationPlugin) {
+    TransformationStage(Plugin<Predicate<R>> predicatePlugin, boolean negate, 
Plugin<Transformation<R>> transformationPlugin, Function<ClassLoader, 
LoaderSwap> pluginLoaderSwapper) {
         this.predicatePlugin = predicatePlugin;
         this.negate = negate;
         this.transformationPlugin = transformationPlugin;
+        this.pluginLoaderSwapper = pluginLoaderSwapper;
     }
 
     public Class<? extends Transformation<R>> transformClass() {
@@ -54,8 +60,17 @@ public class TransformationStage<R extends ConnectRecord<R>> 
implements AutoClos
     }
 
     public R apply(R record) {
-        if (predicatePlugin == null || predicatePlugin.get() == null || negate 
^ predicatePlugin.get().test(record)) {
-            return transformationPlugin.get().apply(record);
+        Predicate<R> predicate = predicatePlugin != null ? 
predicatePlugin.get() : null;
+        boolean shouldTransform = predicate == null;
+        if (predicate != null) {
+            try (LoaderSwap swap = 
pluginLoaderSwapper.apply(predicate.getClass().getClassLoader())) {
+                shouldTransform = negate ^ predicate.test(record);
+            }
+        }
+        if (shouldTransform) {
+            try (LoaderSwap swap = 
pluginLoaderSwapper.apply(transformationPlugin.get().getClass().getClassLoader()))
 {
+                record = transformationPlugin.get().apply(record);
+            }
         }
         return record;
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index e01a1adef45..a435d281a7c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -67,8 +67,10 @@ import org.apache.kafka.connect.runtime.errors.LogReporter;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
 import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
+import org.apache.kafka.connect.runtime.isolation.PluginUtils;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
+import 
org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
@@ -99,6 +101,7 @@ import org.apache.kafka.connect.util.SinkUtils;
 import org.apache.kafka.connect.util.TopicAdmin;
 import org.apache.kafka.connect.util.TopicCreationGroup;
 
+import 
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -314,32 +317,38 @@ public final class Worker {
 
             final WorkerConnector workerConnector;
             final String connClass = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-            ClassLoader connectorLoader = plugins.connectorLoader(connClass);
-            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
-                log.info("Creating connector {} of type {}", connName, 
connClass);
-                final Connector connector = plugins.newConnector(connClass);
-                final ConnectorConfig connConfig;
-                final CloseableOffsetStorageReader offsetReader;
-                final ConnectorOffsetBackingStore offsetStore;
-                if (ConnectUtils.isSinkConnector(connector)) {
-                    connConfig = new SinkConnectorConfig(plugins, connProps);
-                    offsetReader = null;
-                    offsetStore = null;
-                } else {
-                    SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
-                    connConfig = sourceConfig;
+            final ClassLoader connectorLoader;
 
-                    // Set up the offset backing store for this connector 
instance
-                    offsetStore = config.exactlyOnceSourceEnabled()
+            try {
+                connectorLoader = connectorClassLoader(connProps);
+                try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                    log.info("Creating connector {} of type {}", connName, 
connClass);
+                    final Connector connector = 
instantiateConnector(connProps);
+
+                    final ConnectorConfig connConfig;
+                    final CloseableOffsetStorageReader offsetReader;
+                    final ConnectorOffsetBackingStore offsetStore;
+
+                    if (ConnectUtils.isSinkConnector(connector)) {
+                        connConfig = new SinkConnectorConfig(plugins, 
connProps);
+                        offsetReader = null;
+                        offsetStore = null;
+                    } else {
+                        SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
+                        connConfig = sourceConfig;
+
+                        // Set up the offset backing store for this connector 
instance
+                        offsetStore = config.exactlyOnceSourceEnabled()
                             ? 
offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector, 
null)
                             : 
offsetStoreForRegularSourceConnector(sourceConfig, connName, connector, null);
-                    offsetStore.configure(config);
-                    offsetReader = new OffsetStorageReaderImpl(offsetStore, 
connName, internalKeyConverter, internalValueConverter);
-                }
-                workerConnector = new WorkerConnector(
+                        offsetStore.configure(config);
+                        offsetReader = new 
OffsetStorageReaderImpl(offsetStore, connName, internalKeyConverter, 
internalValueConverter);
+                    }
+                    workerConnector = new WorkerConnector(
                         connName, connector, connConfig, ctx, metrics, 
connectorStatusListener, offsetReader, offsetStore, connectorLoader);
-                log.info("Instantiated connector {} with version {} of type 
{}", connName, connector.version(), connector.getClass());
-                workerConnector.transitionTo(initialState, 
onConnectorStateChange);
+                    log.info("Instantiated connector {} with version {} of 
type {}", connName, connector.version(), connector.getClass());
+                    workerConnector.transitionTo(initialState, 
onConnectorStateChange);
+                }
             } catch (Throwable t) {
                 log.error("Failed to start connector {}", connName, t);
                 connectorStatusListener.onFailure(connName, t);
@@ -655,51 +664,51 @@ public final class Worker {
                 throw new ConnectException("Task already exists in this 
worker: " + id);
 
             connectorStatusMetricsGroup.recordTaskAdded(id);
-            String connType = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-            ClassLoader connectorLoader = plugins.connectorLoader(connType);
-
-            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
-                final ConnectorConfig connConfig = new 
ConnectorConfig(plugins, connProps);
-
-                int maxTasks = connConfig.tasksMax();
-                int numTasks = configState.taskCount(id.connector());
-                checkTasksMax(id.connector(), numTasks, maxTasks, 
connConfig.enforceTasksMax());
-
-                final TaskConfig taskConfig = new TaskConfig(taskProps);
-                final Class<? extends Task> taskClass = 
taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
-                final Task task = plugins.newTask(taskClass);
-                log.info("Instantiated task {} with version {} of type {}", 
id, task.version(), taskClass.getName());
-
-                // By maintaining connector's specific class loader for this 
thread here, we first
-                // search for converters within the connector dependencies.
-                // If any of these aren't found, that means the connector 
didn't configure specific converters,
-                // so we should instantiate based upon the worker configuration
-                Converter keyConverter = plugins.newConverter(connConfig, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage
-                                                                               
                                            .CURRENT_CLASSLOADER);
-                Converter valueConverter = plugins.newConverter(connConfig, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
ClassLoaderUsage.CURRENT_CLASSLOADER);
-                HeaderConverter headerConverter = 
plugins.newHeaderConverter(connConfig, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
-                                                                             
ClassLoaderUsage.CURRENT_CLASSLOADER);
-                if (keyConverter == null) {
-                    keyConverter = plugins.newConverter(config, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
-                    log.info("Set up the key converter {} for task {} using 
the worker config", keyConverter.getClass(), id);
-                } else {
-                    log.info("Set up the key converter {} for task {} using 
the connector config", keyConverter.getClass(), id);
-                }
-                if (valueConverter == null) {
-                    valueConverter = plugins.newConverter(config, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
-                    log.info("Set up the value converter {} for task {} using 
the worker config", valueConverter.getClass(), id);
-                } else {
-                    log.info("Set up the value converter {} for task {} using 
the connector config", valueConverter.getClass(), id);
-                }
-                if (headerConverter == null) {
-                    headerConverter = plugins.newHeaderConverter(config, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage
-                                                                               
                                              .PLUGINS);
-                    log.info("Set up the header converter {} for task {} using 
the worker config", headerConverter.getClass(), id);
-                } else {
-                    log.info("Set up the header converter {} for task {} using 
the connector config", headerConverter.getClass(), id);
-                }
 
-                workerTask = taskBuilder
+            final ClassLoader connectorLoader;
+            try {
+                connectorLoader = connectorClassLoader(connProps);
+                try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                    final ConnectorConfig connConfig = new 
ConnectorConfig(plugins, connProps);
+
+                    int maxTasks = connConfig.tasksMax();
+                    int numTasks = configState.taskCount(id.connector());
+                    checkTasksMax(id.connector(), numTasks, maxTasks, 
connConfig.enforceTasksMax());
+
+                    final TaskConfig taskConfig = new TaskConfig(taskProps);
+                    final Class<? extends Task> taskClass = 
taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
+                    final Task task = plugins.newTask(taskClass);
+                    log.info("Instantiated task {} with version {} of type 
{}", id, task.version(), taskClass.getName());
+
+
+                    // By maintaining connector's specific class loader for 
this thread here, we first
+                    // search for converters within the connector dependencies.
+                    // If any of these aren't found, that means the connector 
didn't configure specific converters,
+                    // so we should instantiate based upon the worker 
configuration
+                    Converter keyConverter = plugins.newConverter(connConfig, 
ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, 
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+                    Converter valueConverter = 
plugins.newConverter(connConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+                    HeaderConverter headerConverter = 
plugins.newHeaderConverter(connConfig, 
ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG, 
ConnectorConfig.HEADER_CONVERTER_VERSION_CONFIG);
+
+                    if (keyConverter == null) {
+                        keyConverter = plugins.newConverter(config, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION);
+                        log.info("Set up the key converter {} for task {} 
using the worker config", keyConverter.getClass(), id);
+                    } else {
+                        log.info("Set up the key converter {} for task {} 
using the connector config", keyConverter.getClass(), id);
+                    }
+                    if (valueConverter == null) {
+                        valueConverter = plugins.newConverter(config, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
WorkerConfig.VALUE_CONVERTER_VERSION);
+                        log.info("Set up the value converter {} for task {} 
using the worker config", valueConverter.getClass(), id);
+                    } else {
+                        log.info("Set up the value converter {} for task {} 
using the connector config", valueConverter.getClass(), id);
+                    }
+                    if (headerConverter == null) {
+                        headerConverter = plugins.newHeaderConverter(config, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, 
WorkerConfig.HEADER_CONVERTER_VERSION);
+                        log.info("Set up the header converter {} for task {} 
using the worker config", headerConverter.getClass(), id);
+                    } else {
+                        log.info("Set up the header converter {} for task {} 
using the connector config", headerConverter.getClass(), id);
+                    }
+
+                    workerTask = taskBuilder
                         .withTask(task)
                         .withConnectorConfig(connConfig)
                         .withKeyConverterPlugin(metrics.wrap(keyConverter, id, 
true))
@@ -708,7 +717,8 @@ public final class Worker {
                         .withClassloader(connectorLoader)
                         .build();
 
-                workerTask.initialize(taskConfig);
+                    workerTask.initialize(taskConfig);
+                }
             } catch (Throwable t) {
                 log.error("Failed to start task {}", id, t);
                 connectorStatusMetricsGroup.recordTaskRemoved(id);
@@ -739,19 +749,17 @@ public final class Worker {
     public KafkaFuture<Void> fenceZombies(String connName, int numTasks, 
Map<String, String> connProps) {
         log.debug("Fencing out {} task producers for source connector {}", 
numTasks, connName);
         try (LoggingContext loggingContext = 
LoggingContext.forConnector(connName)) {
-            String connType = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-            ClassLoader connectorLoader = plugins.connectorLoader(connType);
-            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+            Class<? extends Connector> connectorClass = 
connectorClass(connProps);
+            ClassLoader classLoader = connectorClassLoader(connProps);
+            try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) 
{
                 final SourceConnectorConfig connConfig = new 
SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
-                final Class<? extends Connector> connClass = 
plugins.connectorClass(
-                        
connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
 
                 Map<String, Object> adminConfig = adminConfigs(
                         connName,
                         "connector-worker-adminclient-" + connName,
                         config,
                         connConfig,
-                        connClass,
+                        connectorClass,
                         connectorClientConfigOverridePolicy,
                         kafkaClusterId,
                         ConnectorType.SOURCE);
@@ -1198,11 +1206,9 @@ public final class Worker {
      * @param cb callback to invoke upon completion of the request
      */
     public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
-        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
-
+        Connector connector = instantiateConnector(connectorConfig);
+        ClassLoader connectorLoader = connectorClassLoader(connectorConfig);
         try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
-            Connector connector = plugins.newConnector(connectorClassOrAlias);
             if (ConnectUtils.isSinkConnector(connector)) {
                 log.debug("Fetching offsets for sink connector: {}", connName);
                 sinkConnectorOffsets(connName, connector, connectorConfig, cb);
@@ -1213,6 +1219,43 @@ public final class Worker {
         }
     }
 
+    private Connector instantiateConnector(Map<String, String> connProps) 
throws ConnectException {
+
+        final String klass = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        final String version = 
connProps.get(ConnectorConfig.CONNECTOR_VERSION);
+
+        try {
+            return plugins.newConnector(klass, 
PluginUtils.connectorVersionRequirement(version));
+        } catch (InvalidVersionSpecificationException | 
VersionedPluginLoadingException e) {
+            throw new ConnectException(
+                    String.format("Failed to instantiate class for connector 
%s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e);
+        }
+    }
+
+    private ClassLoader connectorClassLoader(Map<String, String> connProps) 
throws ConnectException {
+        final String klass = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        final String version = 
connProps.get(ConnectorConfig.CONNECTOR_VERSION);
+
+        try {
+            return plugins.pluginLoader(klass, 
PluginUtils.connectorVersionRequirement(version));
+        } catch (InvalidVersionSpecificationException  | 
VersionedPluginLoadingException e) {
+            throw new ConnectException(
+                    String.format("Failed to get class loader for connector 
%s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e);
+        }
+    }
+
+    private Class<? extends Connector> connectorClass(Map<String, String> 
connProps) throws ConnectException {
+        final String klass = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        final String version = 
connProps.get(ConnectorConfig.CONNECTOR_VERSION);
+
+        try {
+            return plugins.connectorClass(klass, 
PluginUtils.connectorVersionRequirement(version));
+        } catch (InvalidVersionSpecificationException | 
VersionedPluginLoadingException e) {
+            throw new ConnectException(
+                String.format("Failed to get class for connector %s, class 
%s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e);
+        }
+    }
+
     /**
      * Get the current consumer group offsets for a sink connector.
      * <p>
@@ -1311,12 +1354,10 @@ public final class Worker {
      */
     public void modifyConnectorOffsets(String connName, Map<String, String> 
connectorConfig,
                                       Map<Map<String, ?>, Map<String, ?>> 
offsets, Callback<Message> cb) {
-        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
-        Connector connector;
 
+        final Connector connector = instantiateConnector(connectorConfig);
+        ClassLoader connectorLoader = connectorClassLoader(connectorConfig);
         try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
-            connector = plugins.newConnector(connectorClassOrAlias);
             if (ConnectUtils.isSinkConnector(connector)) {
                 log.debug("Modifying offsets for sink connector: {}", 
connName);
                 modifySinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
@@ -1791,13 +1832,12 @@ public final class Worker {
             Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
 
             ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
-            final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
-                    
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            final Class<? extends Connector> connectorClass = 
connectorClass(connectorConfig.originalsStrings());
 
             RetryWithToleranceOperator<T> retryWithToleranceOperator = new 
RetryWithToleranceOperator<>(connectorConfig.errorRetryTimeout(),
                     connectorConfig.errorMaxDelayInMillis(), 
connectorConfig.errorToleranceType(), Time.SYSTEM, errorHandlingMetrics);
 
-            TransformationChain<T, R> transformationChain = new 
TransformationChain<>(connectorConfig.<R>transformationStages(id, metrics), 
retryWithToleranceOperator);
+            TransformationChain<T, R> transformationChain = new 
TransformationChain<>(connectorConfig.<R>transformationStages(plugins, id, 
metrics), retryWithToleranceOperator);
             log.info("Initializing: {}", transformationChain);
 
             return doBuild(task, id, configState, statusListener, initialState,
@@ -1862,7 +1902,7 @@ public final class Worker {
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverterPlugin,
                     valueConverterPlugin, errorHandlingMetrics, 
headerConverterPlugin, transformationChain, consumer, classLoader, time,
                     retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore(),
-                    () -> sinkTaskReporters(id, sinkConfig, 
errorHandlingMetrics, connectorClass));
+                    () -> sinkTaskReporters(id, sinkConfig, 
errorHandlingMetrics, connectorClass), plugins.safeLoaderSwapper());
         }
     }
 
@@ -1922,7 +1962,7 @@ public final class Worker {
             return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics,
                     headerConverterPlugin, transformationChain, producer, 
topicAdmin, topicCreationGroups,
                     offsetReader, offsetWriter, offsetStore, config, 
configState, metrics, classLoader, time,
-                    retryWithToleranceOperator, herder.statusBackingStore(), 
executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
+                    retryWithToleranceOperator, herder.statusBackingStore(), 
executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics), 
plugins.safeLoaderSwapper());
         }
     }
 
@@ -1987,7 +2027,7 @@ public final class Worker {
                     headerConverterPlugin, transformationChain, producer, 
topicAdmin, topicCreationGroups,
                     offsetReader, offsetWriter, offsetStore, config, 
configState, metrics, errorHandlingMetrics, classLoader, time, 
retryWithToleranceOperator,
                     herder.statusBackingStore(), sourceConfig, executor, 
preProducerCheck, postProducerCheck,
-                    () -> sourceTaskReporters(id, sourceConfig, 
errorHandlingMetrics));
+                    () -> sourceTaskReporters(id, sourceConfig, 
errorHandlingMetrics), plugins.safeLoaderSwapper());
         }
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index c050c61fb5f..4b8256115ed 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -47,6 +47,7 @@ import 
org.apache.kafka.connect.runtime.errors.ProcessingContext;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
+import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.ClusterConfigState;
@@ -66,6 +67,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -122,9 +124,10 @@ class WorkerSinkTask extends 
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
                           RetryWithToleranceOperator<ConsumerRecord<byte[], 
byte[]>> retryWithToleranceOperator,
                           WorkerErrantRecordReporter 
workerErrantRecordReporter,
                           StatusBackingStore statusBackingStore,
-                          Supplier<List<ErrorReporter<ConsumerRecord<byte[], 
byte[]>>>> errorReportersSupplier) {
+                          Supplier<List<ErrorReporter<ConsumerRecord<byte[], 
byte[]>>>> errorReportersSupplier,
+                          Function<ClassLoader, LoaderSwap> 
pluginLoaderSwapper) {
         super(id, statusListener, initialState, loader, connectMetrics, 
errorMetrics,
-                retryWithToleranceOperator, transformationChain, 
errorReportersSupplier, time, statusBackingStore);
+                retryWithToleranceOperator, transformationChain, 
errorReportersSupplier, time, statusBackingStore, pluginLoaderSwapper);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -539,11 +542,17 @@ class WorkerSinkTask extends 
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
     }
 
     private SinkRecord 
convertAndTransformRecord(ProcessingContext<ConsumerRecord<byte[], byte[]>> 
context, final ConsumerRecord<byte[], byte[]> msg) {
-        SchemaAndValue keyAndSchema = 
retryWithToleranceOperator.execute(context, () -> 
keyConverterPlugin.get().toConnectData(msg.topic(), msg.headers(), msg.key()),
-                Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass());
+        SchemaAndValue keyAndSchema = 
retryWithToleranceOperator.execute(context, () -> {
+            try (LoaderSwap swap = 
pluginLoaderSwapper.apply(keyConverterPlugin.get().getClass().getClassLoader()))
 {
+                return keyConverterPlugin.get().toConnectData(msg.topic(), 
msg.headers(), msg.key());
+            }
+        }, Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass());
 
-        SchemaAndValue valueAndSchema = 
retryWithToleranceOperator.execute(context, () -> 
valueConverterPlugin.get().toConnectData(msg.topic(), msg.headers(), 
msg.value()),
-                Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass());
+        SchemaAndValue valueAndSchema = 
retryWithToleranceOperator.execute(context, () -> {
+            try (LoaderSwap swap = 
pluginLoaderSwapper.apply(valueConverterPlugin.get().getClass().getClassLoader()))
 {
+                return valueConverterPlugin.get().toConnectData(msg.topic(), 
msg.headers(), msg.value());
+            }
+        }, Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass());
 
         Headers headers = retryWithToleranceOperator.execute(context, () -> 
convertHeadersFor(msg), Stage.HEADER_CONVERTER, 
headerConverterPlugin.get().getClass());
 
@@ -580,8 +589,10 @@ class WorkerSinkTask extends 
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
         if (recordHeaders != null) {
             String topic = record.topic();
             for (org.apache.kafka.common.header.Header recordHeader : 
recordHeaders) {
-                SchemaAndValue schemaAndValue = 
headerConverterPlugin.get().toConnectHeader(topic, recordHeader.key(), 
recordHeader.value());
-                result.add(recordHeader.key(), schemaAndValue);
+                try (LoaderSwap swap = 
pluginLoaderSwapper.apply(headerConverterPlugin.get().getClass().getClassLoader()))
 {
+                    SchemaAndValue schemaAndValue = 
headerConverterPlugin.get().toConnectHeader(topic, recordHeader.key(), 
recordHeader.value());
+                    result.add(recordHeader.key(), schemaAndValue);
+                }
             }
         }
         return result;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index a92ebe0fc39..0806e887735 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -28,6 +28,7 @@ import 
org.apache.kafka.connect.runtime.errors.ProcessingContext;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.runtime.errors.ToleranceType;
+import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
@@ -53,6 +54,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static 
org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;
@@ -91,12 +93,13 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
                             RetryWithToleranceOperator<SourceRecord> 
retryWithToleranceOperator,
                             StatusBackingStore statusBackingStore,
                             Executor closeExecutor,
-                            Supplier<List<ErrorReporter<SourceRecord>>> 
errorReportersSupplier) {
+                            Supplier<List<ErrorReporter<SourceRecord>>> 
errorReportersSupplier,
+                            Function<ClassLoader, LoaderSwap> 
pluginLoaderSwapper) {
 
         super(id, task, statusListener, initialState, configState, 
keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, 
transformationChain,
                 null, producer,
                 admin, topicGroups, offsetReader, offsetWriter, offsetStore, 
workerConfig, connectMetrics, errorMetrics, loader,
-                time, retryWithToleranceOperator, statusBackingStore, 
closeExecutor, errorReportersSupplier);
+                time, retryWithToleranceOperator, statusBackingStore, 
closeExecutor, errorReportersSupplier, pluginLoaderSwapper);
 
         this.committableOffsets = CommittableOffsets.EMPTY;
         this.submittedRecords = new SubmittedRecords();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ff0ecfce276..fa28a4e7b0e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -33,6 +33,7 @@ import 
org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
 import org.apache.kafka.connect.runtime.errors.ErrorReporter;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.LoggingContext;
@@ -44,6 +45,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 /**
@@ -78,6 +80,7 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>> 
implements Runnable {
     protected final RetryWithToleranceOperator<T> retryWithToleranceOperator;
     protected final TransformationChain<T, R> transformationChain;
     private final Supplier<List<ErrorReporter<T>>> errorReportersSupplier;
+    protected final Function<ClassLoader, LoaderSwap> pluginLoaderSwapper;
     protected final PluginMetricsImpl pluginMetrics;
 
     public WorkerTask(ConnectorTaskId id,
@@ -90,7 +93,8 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>> 
implements Runnable {
                       TransformationChain<T, R> transformationChain,
                       Supplier<List<ErrorReporter<T>>> errorReportersSupplier,
                       Time time,
-                      StatusBackingStore statusBackingStore) {
+                      StatusBackingStore statusBackingStore,
+                      Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
         this.id = id;
         this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, 
statusListener);
         this.errorMetrics = errorMetrics;
@@ -106,6 +110,7 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>> 
implements Runnable {
         this.errorReportersSupplier = errorReportersSupplier;
         this.time = time;
         this.statusBackingStore = statusBackingStore;
+        this.pluginLoaderSwapper = pluginLoaderSwapper;
         this.pluginMetrics = connectMetrics.taskPluginMetrics(id);
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 8be45e773b3..98f33ea582b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -549,7 +549,7 @@ public class Plugins {
     }
 
     private HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {
-        if (!config.originals().containsKey(classPropertyName) && 
classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
+        if (config.getClass(classPropertyName) == null && classLoaderUsage == 
ClassLoaderUsage.CURRENT_CLASSLOADER) {
             // This configuration does not define the Header Converter via the 
specified property name
             return null;
         }
@@ -602,7 +602,7 @@ public class Plugins {
                 // if the config specifies the class name, use it, otherwise 
use the default which we can get from config.getClass
                 String classOrAlias = 
config.originalsStrings().get(classPropertyName);
                 if (classOrAlias == null) {
-                    classOrAlias = 
config.getClass(classPropertyName).getName();
+                    classOrAlias = config.getClass(classPropertyName) == null 
? null : config.getClass(classPropertyName).getName();
                 }
                 try {
                     klass = pluginClass(delegatingLoader, classOrAlias, 
basePluginClass, range);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
index 780cd19d093..704a1f1ff40 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
@@ -43,6 +43,7 @@ import 
org.apache.kafka.connect.runtime.errors.ProcessingContext;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -966,7 +967,7 @@ public class AbstractWorkerSourceTaskTest {
                 taskId, sourceTask, statusListener, TargetState.STARTED, 
configState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, 
transformationChain,
                 workerTransactionContext, producer, admin, 
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, 
offsetStore,
                 config, metrics, errorHandlingMetrics,  
plugins.delegatingLoader(), Time.SYSTEM, retryWithToleranceOperator,
-                statusBackingStore, Runnable::run, errorReportersSupplier) {
+                statusBackingStore, Runnable::run, errorReportersSupplier, 
TestPlugins.noOpLoaderSwap()) {
             @Override
             protected void prepareToInitializeTask() {
             }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index 2c01c4b4fbc..5253bcb47da 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -163,7 +163,7 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         props.put("transforms.a.type", SimpleTransformation.class.getName());
         props.put("transforms.a.magic.number", "42");
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, 
props);
-        final List<TransformationStage<SinkRecord>> transformationStages = 
config.transformationStages(CONNECTOR_TASK_ID, METRICS);
+        final List<TransformationStage<SinkRecord>> transformationStages = 
config.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
         assertEquals(1, transformationStages.size());
         final TransformationStage<SinkRecord> stage = 
transformationStages.get(0);
         assertEquals(SimpleTransformation.class, stage.transformClass());
@@ -192,7 +192,7 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         props.put("transforms.b.type", SimpleTransformation.class.getName());
         props.put("transforms.b.magic.number", "84");
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, 
props);
-        final List<TransformationStage<SinkRecord>> transformationStages = 
config.transformationStages(CONNECTOR_TASK_ID, METRICS);
+        final List<TransformationStage<SinkRecord>> transformationStages = 
config.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
         assertEquals(2, transformationStages.size());
         assertEquals(42, 
transformationStages.get(0).apply(DUMMY_RECORD).kafkaPartition().intValue());
         assertEquals(84, 
transformationStages.get(1).apply(DUMMY_RECORD).kafkaPartition().intValue());
@@ -293,7 +293,7 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
 
     private void assertTransformationStageWithPredicate(Map<String, String> 
props, boolean expectedNegated) {
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, 
props);
-        final List<TransformationStage<SinkRecord>> transformationStages = 
config.transformationStages(CONNECTOR_TASK_ID, METRICS);
+        final List<TransformationStage<SinkRecord>> transformationStages = 
config.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
         assertEquals(1, transformationStages.size());
         TransformationStage<SinkRecord> stage = transformationStages.get(0);
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 035b30216da..70edfb0f598 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.connect.runtime.errors.ToleranceType;
 import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -427,7 +428,7 @@ public class ErrorHandlingTaskTest {
 
         Plugin<Transformation<SinkRecord>> transformationPlugin = 
metrics.wrap(new FaultyPassthrough<SinkRecord>(), taskId, "");
         TransformationChain<ConsumerRecord<byte[], byte[]>, SinkRecord> 
sinkTransforms =
-                new TransformationChain<>(singletonList(new 
TransformationStage<>(transformationPlugin)), retryWithToleranceOperator);
+                new TransformationChain<>(singletonList(new 
TransformationStage<>(transformationPlugin, TestPlugins.noOpLoaderSwap())), 
retryWithToleranceOperator);
 
         Plugin<Converter> keyConverterPlugin = metrics.wrap(converter, taskId, 
 true);
         Plugin<Converter> valueConverterPlugin = metrics.wrap(converter, 
taskId,  false);
@@ -437,7 +438,7 @@ public class ErrorHandlingTaskTest {
             ClusterConfigState.EMPTY, metrics, keyConverterPlugin, 
valueConverterPlugin, errorHandlingMetrics,
                 headerConverterPlugin, sinkTransforms, consumer, pluginLoader, 
time,
             retryWithToleranceOperator, workerErrantRecordReporter,
-                statusBackingStore, () -> errorReporters);
+                statusBackingStore, () -> errorReporters, 
TestPlugins.noOpLoaderSwap());
     }
 
     private void createSourceTask(TargetState initialState, 
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator, 
List<ErrorReporter<SourceRecord>> errorReporters) {
@@ -463,7 +464,7 @@ public class ErrorHandlingTaskTest {
                                   List<ErrorReporter<SourceRecord>> 
errorReporters, Converter converter) {
         Plugin<Transformation<SourceRecord>> transformationPlugin = 
metrics.wrap(new FaultyPassthrough<SourceRecord>(), taskId, "");
         TransformationChain<SourceRecord, SourceRecord> sourceTransforms = new 
TransformationChain<>(singletonList(
-                new TransformationStage<>(transformationPlugin)), 
retryWithToleranceOperator);
+                new TransformationStage<>(transformationPlugin, 
TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator);
 
         Plugin<Converter> keyConverterPlugin = metrics.wrap(converter, taskId, 
 true);
         Plugin<Converter> valueConverterPlugin = metrics.wrap(converter, 
taskId,  false);
@@ -476,7 +477,7 @@ public class ErrorHandlingTaskTest {
                 offsetReader, offsetWriter, offsetStore, workerConfig,
                 ClusterConfigState.EMPTY, metrics, pluginLoader, time,
                 retryWithToleranceOperator,
-                statusBackingStore, Runnable::run, () -> errorReporters));
+                statusBackingStore, Runnable::run, () -> errorReporters, 
TestPlugins.noOpLoaderSwap()));
 
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 24103aefd97..a6375398d29 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -36,6 +36,7 @@ import 
org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -283,7 +284,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
         workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, 
statusListener, initialState, keyConverterPlugin, valueConverterPlugin, 
headerConverterPlugin,
                 transformationChain, producer, admin, 
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, 
offsetStore,
                 config, clusterConfigState, metrics, errorHandlingMetrics, 
plugins.delegatingLoader(), time, 
RetryWithToleranceOperatorTest.noneOperator(), statusBackingStore,
-                sourceConfig, Runnable::run, preProducerCheck, 
postProducerCheck, Collections::emptyList);
+                sourceConfig, Runnable::run, preProducerCheck, 
postProducerCheck, Collections::emptyList, TestPlugins.noOpLoaderSwap());
     }
 
     @ParameterizedTest
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
index 88ed326457a..e2791a63f7b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
@@ -61,7 +62,9 @@ public class TransformationStageTest {
         TransformationStage<SourceRecord> stage = new TransformationStage<>(
                 predicatePlugin,
                 negate,
-                transformationPlugin);
+                transformationPlugin,
+                TestPlugins.noOpLoaderSwap()
+        );
 
         assertEquals(expectedResult, stage.apply(initial));
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 0750b0855cb..2607ee8b03b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -48,6 +48,7 @@ import 
org.apache.kafka.connect.runtime.errors.ProcessingContext;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -228,7 +229,7 @@ public class WorkerSinkTaskTest {
                 taskId, task, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, connectMetrics,
                 keyConverterPlugin, valueConverterPlugin, errorMetrics, 
headerConverterPlugin,
                 transformationChain, consumer, loader, time,
-                retryWithToleranceOperator, null, statusBackingStore, 
errorReportersSupplier);
+                retryWithToleranceOperator, null, statusBackingStore, 
errorReportersSupplier, TestPlugins.noOpLoaderSwap());
     }
 
     @AfterEach
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 15ea638ef02..74021118098 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -35,6 +35,7 @@ import 
org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
 import org.apache.kafka.connect.runtime.errors.ProcessingContext;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -182,7 +183,7 @@ public class WorkerSinkTaskThreadedTest {
                 taskId, sinkTask, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, metrics, keyConverterPlugin,
                 valueConverterPlugin, errorHandlingMetrics, 
headerConverterPlugin, transformationChain,
                 consumer, pluginLoader, time, 
RetryWithToleranceOperatorTest.noneOperator(), null, statusBackingStore,
-                Collections::emptyList);
+                Collections::emptyList, TestPlugins.noOpLoaderSwap());
         recordsReturned = 0;
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 6d6ab60c4a7..23fb3618f81 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -38,6 +38,7 @@ import 
org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -254,7 +255,7 @@ public class WorkerSourceTaskTest {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, 
initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics, 
headerConverterPlugin,
                 transformationChain, producer, admin, 
TopicCreationGroup.configuredGroups(sourceConfig),
                 offsetReader, offsetWriter, offsetStore, config, 
clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
-                retryWithToleranceOperator, statusBackingStore, Runnable::run, 
Collections::emptyList);
+                retryWithToleranceOperator, statusBackingStore, Runnable::run, 
Collections::emptyList, TestPlugins.noOpLoaderSwap());
     }
 
     @ParameterizedTest
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index c8c8cc49d05..eae9c96998b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -24,6 +24,7 @@ import 
org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
 import org.apache.kafka.connect.runtime.errors.ErrorReporter;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.storage.StatusBackingStore;
@@ -299,7 +300,7 @@ public class WorkerTaskTest {
                               Supplier<List<ErrorReporter<Object>>> 
errorReporterSupplier,
                               Time time, StatusBackingStore 
statusBackingStore) {
             super(id, statusListener, initialState, loader, connectMetrics, 
errorHandlingMetrics,
-                    retryWithToleranceOperator, transformationChain, 
errorReporterSupplier, time, statusBackingStore);
+                    retryWithToleranceOperator, transformationChain, 
errorReporterSupplier, time, statusBackingStore, TestPlugins.noOpLoaderSwap());
         }
 
         @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index b4ad23b37a9..2f7af629f06 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -89,6 +89,7 @@ import org.apache.kafka.connect.util.FutureCallback;
 import org.apache.kafka.connect.util.SinkUtils;
 import org.apache.kafka.connect.util.TopicAdmin;
 
+import org.apache.maven.artifact.versioning.VersionRange;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -322,7 +323,7 @@ public class WorkerTest {
 
         // Create
         mockKafkaClusterId();
-        mockConnectorIsolation(connectorClass, sourceConnector);
+        mockVersionedConnectorIsolation(connectorClass, null, sourceConnector);
         mockExecutorRealSubmit(WorkerConnector.class);
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@@ -358,7 +359,7 @@ public class WorkerTest {
 
 
         verifyKafkaClusterId();
-        verifyConnectorIsolation(sourceConnector);
+        verifyVersionedConnectorIsolation(connectorClass, null, 
sourceConnector);
         verifyExecutorSubmit();
         verify(sourceConnector).initialize(any(ConnectorContext.class));
         verify(sourceConnector).start(connectorProps);
@@ -389,7 +390,8 @@ public class WorkerTest {
         mockKafkaClusterId();
         mockGenericIsolation();
 
-        when(plugins.newConnector(anyString())).thenThrow(exception);
+        when(plugins.pluginLoader(nonConnectorClass, 
null)).thenReturn(pluginLoader);
+        when(plugins.newConnector(nonConnectorClass, 
null)).thenThrow(exception);
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
         worker.herder = herder;
@@ -414,7 +416,7 @@ public class WorkerTest {
         assertStatistics(worker, 0, 0);
         assertStartupStatistics(worker, 1, 1, 0, 0);
 
-        verify(plugins).newConnector(anyString());
+        verify(plugins).newConnector(nonConnectorClass, null);
         verifyKafkaClusterId();
         verifyGenericIsolation();
         verify(connectorStatusListener).onFailure(eq(CONNECTOR_ID), 
any(ConnectException.class));
@@ -426,7 +428,7 @@ public class WorkerTest {
         setup(enableTopicCreation);
         final String connectorAlias = "SampleSourceConnector";
         mockKafkaClusterId();
-        mockConnectorIsolation(connectorAlias, sinkConnector);
+        mockVersionedConnectorIsolation(connectorAlias, null, sinkConnector);
         mockExecutorRealSubmit(WorkerConnector.class);
 
         connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorAlias);
@@ -456,7 +458,7 @@ public class WorkerTest {
         assertStartupStatistics(worker, 1, 0, 0, 0);
 
         verifyKafkaClusterId();
-        verifyConnectorIsolation(sinkConnector);
+        verifyVersionedConnectorIsolation(connectorAlias, null, sinkConnector);
         verifyExecutorSubmit();
         verify(sinkConnector).initialize(any(ConnectorContext.class));
         verify(sinkConnector).start(connectorProps);
@@ -472,7 +474,7 @@ public class WorkerTest {
         final String shortConnectorAlias = "WorkerTest";
 
         mockKafkaClusterId();
-        mockConnectorIsolation(shortConnectorAlias, sinkConnector);
+        mockVersionedConnectorIsolation(shortConnectorAlias, null, 
sinkConnector);
         mockExecutorRealSubmit(WorkerConnector.class);
         connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
shortConnectorAlias);
 
@@ -499,7 +501,7 @@ public class WorkerTest {
         assertStatistics(worker, 0, 0);
 
         verifyKafkaClusterId();
-        verifyConnectorIsolation(sinkConnector);
+        verifyVersionedConnectorIsolation(shortConnectorAlias, null, 
sinkConnector);
         verify(sinkConnector).initialize(any(ConnectorContext.class));
         verify(sinkConnector).start(connectorProps);
         verify(connectorStatusListener).onStartup(CONNECTOR_ID);
@@ -531,7 +533,7 @@ public class WorkerTest {
         final String connectorClass = SampleSourceConnector.class.getName();
 
         mockKafkaClusterId();
-        mockConnectorIsolation(connectorClass, sinkConnector);
+        mockVersionedConnectorIsolation(connectorClass, null, sinkConnector);
         mockExecutorRealSubmit(WorkerConnector.class);
 
         Map<String, String> taskProps = Collections.singletonMap("foo", "bar");
@@ -584,7 +586,7 @@ public class WorkerTest {
         assertStatistics(worker, 0, 0);
 
         verifyKafkaClusterId();
-        verifyConnectorIsolation(sinkConnector);
+        verifyVersionedConnectorIsolation(connectorClass, null, sinkConnector);
         verifyExecutorSubmit();
         verify(sinkConnector).initialize(any(ConnectorContext.class));
         verify(sinkConnector).start(connectorProps);
@@ -601,10 +603,10 @@ public class WorkerTest {
     public void testAddRemoveSourceTask(boolean enableTopicCreation) {
         setup(enableTopicCreation);
         mockKafkaClusterId();
-        mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, 
task);
-        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
-        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
-        mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
taskHeaderConverter);
+        mockVersionedTaskIsolation(SampleSourceConnector.class, 
TestSourceTask.class, null, sourceConnector, task);
+        
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
+        
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
+        mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
         mockExecutorFakeSubmit(WorkerTask.class);
 
         Map<String, String> origProps = 
Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
@@ -642,10 +644,10 @@ public class WorkerTest {
         assertStatistics(worker, 0, 0);
 
         verifyKafkaClusterId();
-        verifyTaskIsolation(task);
-        verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
-        verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
-        verifyTaskHeaderConverter();
+        verifyVersionedTaskIsolation(SampleSourceConnector.class, 
TestSourceTask.class, null, task);
+        
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+        
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+        verifyVersionedTaskHeaderConverterFromConnector();
 
         verifyExecutorSubmit();
     }
@@ -657,10 +659,10 @@ public class WorkerTest {
         // Most of the other cases use source tasks; we make sure to get code 
coverage for sink tasks here as well
         SinkTask task = mock(TestSinkTask.class);
         mockKafkaClusterId();
-        mockTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, task);
-        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
-        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
-        mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
taskHeaderConverter);
+        mockVersionedTaskIsolation(SampleSinkConnector.class, 
TestSinkTask.class, null, sinkConnector, task);
+        
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
+        
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
+        mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
         mockExecutorFakeSubmit(WorkerTask.class);
 
         Map<String, String> origProps = 
Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, 
TestSinkTask.class.getName());
@@ -700,10 +702,10 @@ public class WorkerTest {
         assertStatistics(worker, 0, 0);
 
         verifyKafkaClusterId();
-        verifyTaskIsolation(task);
-        verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
-        verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
-        verifyTaskHeaderConverter();
+        verifyVersionedTaskIsolation(SampleSinkConnector.class, 
TestSinkTask.class, null, task);
+        
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+        
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+        verifyVersionedTaskHeaderConverterFromConnector();
         verifyExecutorSubmit();
     }
 
@@ -729,10 +731,10 @@ public class WorkerTest {
         config = new DistributedConfig(workerProps);
 
         mockKafkaClusterId();
-        mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, 
task);
-        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
-        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
-        mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
taskHeaderConverter);
+        mockVersionedTaskIsolation(SampleSourceConnector.class, 
TestSourceTask.class, null, sourceConnector, task);
+        
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
+        
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
+        mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
         mockExecutorFakeSubmit(WorkerTask.class);
 
         Runnable preProducer = mock(Runnable.class);
@@ -774,10 +776,10 @@ public class WorkerTest {
         assertStatistics(worker, 0, 0);
 
         verifyKafkaClusterId();
-        verifyTaskIsolation(task);
-        verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
-        verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
-        verifyTaskHeaderConverter();
+        verifyVersionedTaskIsolation(SampleSourceConnector.class, 
TestSourceTask.class, null, task);
+        
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+        
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+        verifyVersionedTaskHeaderConverterFromConnector();
         verifyExecutorSubmit();
     }
 
@@ -794,11 +796,10 @@ public class WorkerTest {
         TaskConfig taskConfig = new TaskConfig(origProps);
 
         mockKafkaClusterId();
-        mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, 
task);
-        // Expect that the worker will create converters and will find them 
using the current classloader ...
-        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
-        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
-        mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
taskHeaderConverter);
+        mockVersionedTaskIsolation(SampleSourceConnector.class, 
TestSourceTask.class, null, sourceConnector, task);
+        
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
+        
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
+        mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
         mockExecutorFakeSubmit(WorkerTask.class);
 
 
@@ -851,7 +852,7 @@ public class WorkerTest {
         verify(instantiatedTask).initialize(taskConfig);
         verify(herder, times(5)).taskStatus(TASK_ID);
         verifyKafkaClusterId();
-        verifyTaskIsolation(task);
+        verifyVersionedTaskIsolation(SampleSourceConnector.class, 
TestSourceTask.class, null, task);
         verifyExecutorSubmit();
         verify(instantiatedTask, atLeastOnce()).id();
         verify(instantiatedTask).awaitStop(anyLong());
@@ -860,9 +861,9 @@ public class WorkerTest {
         // Called when we stop the worker
         verify(instantiatedTask).loader();
         verify(instantiatedTask).stop();
-        verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
-        verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
-        verifyTaskHeaderConverter();
+        
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+        
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+        verifyVersionedTaskHeaderConverterFromConnector();
     }
 
     @ParameterizedTest
@@ -907,6 +908,7 @@ public class WorkerTest {
 
         mockKafkaClusterId();
         mockGenericIsolation();
+        when(plugins.pluginLoader(SampleSourceConnector.class.getName(), 
null)).thenReturn(pluginLoader);
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
         worker.herder = herder;
@@ -935,14 +937,14 @@ public class WorkerTest {
         mockFileConfigProvider();
 
         mockKafkaClusterId();
-        mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, 
task);
+        mockVersionedTaskIsolation(SampleSourceConnector.class, 
TestSourceTask.class, null, sourceConnector, task);
         // Expect that the worker will create converters and will not 
initially find them using the current classloader ...
-        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, null);
-        mockTaskConverter(ClassLoaderUsage.PLUGINS, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
-        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, null);
-        mockTaskConverter(ClassLoaderUsage.PLUGINS, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
-        mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
-        mockTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
+        
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, null);
+        
mockVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
WorkerConfig.KEY_CONVERTER_VERSION, taskKeyConverter);
+        
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, null);
+        
mockVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
WorkerConfig.VALUE_CONVERTER_VERSION, taskValueConverter);
+        mockVersionedTaskHeaderConverterFromConnector(null);
+        mockVersionedTaskHeaderConverterFromWorker(taskHeaderConverter);
         mockExecutorFakeSubmit(WorkerTask.class);
 
         Map<String, String> origProps = 
Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
@@ -968,7 +970,13 @@ public class WorkerTest {
         verify(constructedMockTask).awaitStop(anyLong());
         verify(constructedMockTask).removeMetrics();
         verifyKafkaClusterId();
-        verifyTaskIsolation(task);
+        verifyVersionedTaskIsolation(SampleSourceConnector.class, 
TestSourceTask.class, null, task);
+        
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+        
verifyVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
WorkerConfig.KEY_CONVERTER_VERSION);
+        
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+        
verifyVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
 WorkerConfig.VALUE_CONVERTER_VERSION);
+        verifyVersionedTaskHeaderConverterFromConnector();
+        verifyVersionedTaskHeaderConverterFromWorker();
         verifyConverters();
         verifyExecutorSubmit();
     }
@@ -985,14 +993,14 @@ public class WorkerTest {
         TaskConfig taskConfig = new TaskConfig(origProps);
 
         mockKafkaClusterId();
-        mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, 
task);
+        mockVersionedTaskIsolation(SampleSourceConnector.class, 
TestSourceTask.class, null, sourceConnector, task);
         // Expect that the worker will create converters and will not 
initially find them using the current classloader ...
-        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, null);
-        mockTaskConverter(ClassLoaderUsage.PLUGINS, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
-        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, null);
-        mockTaskConverter(ClassLoaderUsage.PLUGINS, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
-        mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
-        mockTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
+        
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, null);
+        
mockVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
WorkerConfig.KEY_CONVERTER_VERSION, taskKeyConverter);
+        
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, null);
+        
mockVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
WorkerConfig.VALUE_CONVERTER_VERSION, taskValueConverter);
+        mockVersionedTaskHeaderConverterFromConnector(null);
+        mockVersionedTaskHeaderConverterFromWorker(taskHeaderConverter);
         mockExecutorFakeSubmit(WorkerTask.class);
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, executorService,
@@ -1024,7 +1032,13 @@ public class WorkerTest {
         verify(instantiatedTask).removeMetrics();
 
         verifyKafkaClusterId();
-        verifyTaskIsolation(task);
+        verifyVersionedTaskIsolation(SampleSourceConnector.class, 
TestSourceTask.class, null, task);
+        
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+        
verifyVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
WorkerConfig.KEY_CONVERTER_VERSION);
+        
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+        
verifyVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
 WorkerConfig.VALUE_CONVERTER_VERSION);
+        verifyVersionedTaskHeaderConverterFromConnector();
+        verifyVersionedTaskHeaderConverterFromWorker();
         verifyExecutorSubmit();
         verifyStorage();
     }
@@ -1860,7 +1874,7 @@ public class WorkerTest {
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "rawtypes"})
     public void testZombieFencing(boolean enableTopicCreation) {
         setup(enableTopicCreation);
         Admin admin = mock(Admin.class);
@@ -1878,6 +1892,8 @@ public class WorkerTest {
 
         mockKafkaClusterId();
         mockGenericIsolation();
+        when(plugins.connectorClass(anyString(), any())).thenReturn((Class) 
sourceConnector.getClass());
+        when(plugins.pluginLoader(SampleSourceConnector.class.getName(), 
null)).thenReturn(pluginLoader);
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, executorService,
                 allConnectorClientConfigOverridePolicy, mockAdminConstructor);
@@ -2087,7 +2103,8 @@ public class WorkerTest {
         worker.start();
 
         mockGenericIsolation();
-        when(plugins.newConnector(anyString())).thenReturn(sourceConnector);
+        when(plugins.newConnector(anyString(), 
any())).thenReturn(sourceConnector);
+        when(plugins.pluginLoader(SampleSourceConnector.class.getName(), 
null)).thenReturn(pluginLoader);
         when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
         when(sourceConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenThrow(new UnsupportedOperationException("This connector doesn't 
" +
                 "support altering of offsets"));
@@ -2679,7 +2696,7 @@ public class WorkerTest {
         String connectorClass = SampleSourceConnector.class.getName();
         connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
         connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, 
Boolean.toString(enforced));
-        mockConnectorIsolation(connectorClass, sourceConnector);
+        mockVersionedConnectorIsolation(connectorClass, null, sourceConnector);
 
         mockExecutorRealSubmit(WorkerConnector.class);
 
@@ -2851,10 +2868,10 @@ public class WorkerTest {
 
                 tasksMaxExceededMessage = 
failureCaptor.getValue().getMessage();
             } else {
-                mockTaskIsolation(SampleSinkConnector.class, 
TestSinkTask.class, task);
-                mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
-                mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
-                mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
taskHeaderConverter);
+                mockVersionedTaskIsolation(SampleSinkConnector.class, 
TestSinkTask.class, null, sinkConnector, task);
+                
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
+                
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
 ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
+                
mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
                 mockExecutorFakeSubmit(WorkerTask.class);
 
                 assertTrue(worker.startSinkTask(
@@ -2964,8 +2981,20 @@ public class WorkerTest {
                        .thenReturn(returning);
     }
 
-    private void verifyTaskConverter(String converterClassConfig) {
-        verify(plugins).newConverter(any(AbstractConfig.class), 
eq(converterClassConfig), eq(ClassLoaderUsage.CURRENT_CLASSLOADER));
+    private void mockVersionedTaskConverterFromConnector(String 
converterClassConfig, String converterVersionConfig, Converter returning) {
+        when(plugins.newConverter(any(ConnectorConfig.class), 
eq(converterClassConfig), eq(converterVersionConfig))).thenReturn(returning);
+    }
+
+    private void verifyVersionedTaskConverterFromConnector(String 
converterClassConfig, String converterVersionConfig) {
+        verify(plugins).newConverter(any(ConnectorConfig.class), 
eq(converterClassConfig), eq(converterVersionConfig));
+    }
+
+    private void mockVersionedTaskConverterFromWorker(String 
converterClassConfig, String converterVersionConfig, Converter returning) {
+        when(plugins.newConverter(any(WorkerConfig.class), 
eq(converterClassConfig), eq(converterVersionConfig))).thenReturn(returning);
+    }
+
+    private void verifyVersionedTaskConverterFromWorker(String 
converterClassConfig, String converterVersionConfig) {
+        verify(plugins).newConverter(any(WorkerConfig.class), 
eq(converterClassConfig), eq(converterVersionConfig));
     }
 
     private void mockTaskHeaderConverter(ClassLoaderUsage classLoaderUsage, 
HeaderConverter returning) {
@@ -2977,8 +3006,25 @@ public class WorkerTest {
         verify(plugins).newHeaderConverter(any(AbstractConfig.class), 
eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), 
eq(ClassLoaderUsage.CURRENT_CLASSLOADER));
     }
 
+    private void mockVersionedTaskHeaderConverterFromConnector(HeaderConverter 
returning) {
+        when(plugins.newHeaderConverter(any(ConnectorConfig.class), 
eq(ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG), 
eq(ConnectorConfig.HEADER_CONVERTER_VERSION_CONFIG)))
+               .thenReturn(returning);
+    }
+
+    private void verifyVersionedTaskHeaderConverterFromConnector() {
+        verify(plugins).newHeaderConverter(any(ConnectorConfig.class), 
eq(ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG), 
eq(ConnectorConfig.HEADER_CONVERTER_VERSION_CONFIG));
+    }
+
+    private void mockVersionedTaskHeaderConverterFromWorker(HeaderConverter 
returning) {
+        when(plugins.newHeaderConverter(any(WorkerConfig.class), 
eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), 
eq(WorkerConfig.HEADER_CONVERTER_VERSION)))
+            .thenReturn(returning);
+    }
+
+    private void verifyVersionedTaskHeaderConverterFromWorker() {
+        verify(plugins).newHeaderConverter(any(WorkerConfig.class), 
eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), 
eq(WorkerConfig.HEADER_CONVERTER_VERSION));
+    }
+
     private void mockGenericIsolation() {
-        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
         when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
     }
 
@@ -2993,12 +3039,26 @@ public class WorkerTest {
         when(connector.version()).thenReturn("1.0");
     }
 
+    private void mockVersionedConnectorIsolation(String connectorClass, 
VersionRange range, Connector connector) {
+        mockGenericIsolation();
+        when(plugins.pluginLoader(connectorClass, 
range)).thenReturn(pluginLoader);
+        when(plugins.newConnector(connectorClass, 
range)).thenReturn(connector);
+        when(connector.version()).thenReturn(range == null ? "unknown" : 
range.toString());
+    }
+
     private void verifyConnectorIsolation(Connector connector) {
         verifyGenericIsolation();
         verify(plugins).newConnector(anyString());
         verify(connector, atLeastOnce()).version();
     }
 
+    private void verifyVersionedConnectorIsolation(String connectorClass, 
VersionRange range, Connector connector) {
+        verifyGenericIsolation();
+        verify(plugins).pluginLoader(connectorClass, range);
+        verify(plugins).newConnector(connectorClass, range);
+        verify(connector, atLeastOnce()).version();
+    }
+
     private void mockTaskIsolation(Class<? extends Connector> connector, 
Class<? extends Task> taskClass, Task task) {
         mockGenericIsolation();
         doReturn(connector).when(plugins).connectorClass(connector.getName());
@@ -3006,6 +3066,15 @@ public class WorkerTest {
         when(task.version()).thenReturn("1.0");
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private void mockVersionedTaskIsolation(Class<? extends Connector> 
connectorClass, Class<? extends Task> taskClass, VersionRange range, Connector 
connector, Task task) {
+        mockGenericIsolation();
+        when(plugins.pluginLoader(connectorClass.getName(), 
range)).thenReturn(pluginLoader);
+        when(plugins.connectorClass(connectorClass.getName(), 
range)).thenReturn((Class) connectorClass);
+        when(plugins.newTask(taskClass)).thenReturn(task);
+        when(task.version()).thenReturn(range == null ? "unknown" : 
range.toString());
+    }
+
     private void verifyTaskIsolation(Task task) {
         verifyGenericIsolation();
         verify(plugins).connectorClass(anyString());
@@ -3013,6 +3082,14 @@ public class WorkerTest {
         verify(task).version();
     }
 
+    private void verifyVersionedTaskIsolation(Class<? extends Connector> 
connectorClass, Class<? extends Task> taskClass, VersionRange range, Task task) 
{
+        verifyGenericIsolation();
+        verify(plugins).pluginLoader(connectorClass.getName(), range);
+        verify(plugins).connectorClass(connectorClass.getName(), range);
+        verify(plugins).newTask(taskClass);
+        verify(task).version();
+    }
+
     private void mockExecutorRealSubmit(Class<? extends Runnable> 
runnableClass) {
         // This test expects the runnable to be executed, so have the isolated 
runnable pass-through.
         // Requires using the Worker constructor without the mocked 
executorService
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
index d4cac9484e6..c91fa1017ce 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
 import org.apache.kafka.connect.storage.AppliedConnectorConfig;
 import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.apache.kafka.connect.transforms.Transformation;
@@ -195,7 +196,8 @@ public class WorkerTestUtils {
         TransformationStage<R> stage = new TransformationStage<>(
                 predicatePlugin,
                 false,
-                transformationPlugin);
+                transformationPlugin,
+                TestPlugins.noOpLoaderSwap());
         TransformationChain<T, R> realTransformationChainRetriableException = 
new TransformationChain(List.of(stage), toleranceOperator);
         TransformationChain<T, R> transformationChainRetriableException = 
Mockito.spy(realTransformationChainRetriableException);
         return transformationChainRetriableException;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 55a3445a331..ca4c29931d0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -69,7 +69,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -197,16 +196,9 @@ public class PluginsTest {
         props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
         createConfig();
 
-        // Because it's not explicitly set on the supplied configuration, the 
logic to use the current classloader for the connector
-        // will exit immediately, and so this method always returns null
         HeaderConverter headerConverter = plugins.newHeaderConverter(config,
-                                                                     
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
-                                                                     
ClassLoaderUsage.CURRENT_CLASSLOADER);
-        assertNull(headerConverter);
-        // But we should always find it (or the worker's default) when using 
the plugins classloader ...
-        headerConverter = plugins.newHeaderConverter(config,
                                                      
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
-                                                     ClassLoaderUsage.PLUGINS);
+                                                     
ClassLoaderUsage.CURRENT_CLASSLOADER);
         assertNotNull(headerConverter);
         assertInstanceOf(SimpleHeaderConverter.class, headerConverter);
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
index adb2c2418d5..0e472a3ebe0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.jar.Attributes;
 import java.util.jar.JarEntry;
@@ -374,6 +375,12 @@ public class TestPlugins {
                 .collect(Collectors.toList());
     }
 
+    public static Function<ClassLoader, LoaderSwap> noOpLoaderSwap() {
+        return classLoader -> {
+            return new 
LoaderSwap(Thread.currentThread().getContextClassLoader());
+        };
+    }
+
     private static TestPlugin[] defaultPlugins() {
         return Arrays.stream(TestPlugin.values())
                 .filter(TestPlugin::includeByDefault)
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
index 29a9426e337..ca358f18f43 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
@@ -519,7 +519,7 @@ public class TopicCreationTest {
         topicCreation.addTopic(FOO_TOPIC);
         assertFalse(topicCreation.isTopicCreationRequired(FOO_TOPIC));
 
-        List<TransformationStage<SourceRecord>> transformationStages = 
sourceConfig.transformationStages(CONNECTOR_TASK_ID, METRICS);
+        List<TransformationStage<SourceRecord>> transformationStages = 
sourceConfig.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
         assertEquals(1, transformationStages.size());
         TransformationStage<SourceRecord> xform = transformationStages.get(0);
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, null, Schema.INT8_SCHEMA, 42));
@@ -626,7 +626,7 @@ public class TopicCreationTest {
         assertEquals(barPartitions, barTopicSpec.numPartitions());
         assertEquals(barTopicProps, barTopicSpec.configs());
 
-        List<TransformationStage<SourceRecord>> transformationStages = 
sourceConfig.transformationStages(CONNECTOR_TASK_ID, METRICS);
+        List<TransformationStage<SourceRecord>> transformationStages = 
sourceConfig.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
         assertEquals(2, transformationStages.size());
 
         TransformationStage<SourceRecord> castXForm = 
transformationStages.get(0);

Reply via email to