This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9dc9973c1ca KAFKA-18863: Connect Multiversion Support (Versioned
Connector Creation and related changes) (#17743)
9dc9973c1ca is described below
commit 9dc9973c1ca53218449f311e21478848f2d1fa92
Author: snehashisp <[email protected]>
AuthorDate: Thu Feb 27 04:42:34 2025 +0530
KAFKA-18863: Connect Multiversion Support (Versioned Connector Creation and
related changes) (#17743)
Reviewers: Greg Harris <[email protected]>
---
.../connect/runtime/AbstractWorkerSourceTask.java | 28 ++-
.../kafka/connect/runtime/ConnectorConfig.java | 26 ++-
.../runtime/ExactlyOnceWorkerSourceTask.java | 7 +-
.../kafka/connect/runtime/TransformationStage.java | 25 ++-
.../org/apache/kafka/connect/runtime/Worker.java | 214 +++++++++++---------
.../kafka/connect/runtime/WorkerSinkTask.java | 27 ++-
.../kafka/connect/runtime/WorkerSourceTask.java | 7 +-
.../apache/kafka/connect/runtime/WorkerTask.java | 7 +-
.../kafka/connect/runtime/isolation/Plugins.java | 4 +-
.../runtime/AbstractWorkerSourceTaskTest.java | 3 +-
.../kafka/connect/runtime/ConnectorConfigTest.java | 6 +-
.../connect/runtime/ErrorHandlingTaskTest.java | 9 +-
.../runtime/ExactlyOnceWorkerSourceTaskTest.java | 3 +-
.../connect/runtime/TransformationStageTest.java | 5 +-
.../kafka/connect/runtime/WorkerSinkTaskTest.java | 3 +-
.../runtime/WorkerSinkTaskThreadedTest.java | 3 +-
.../connect/runtime/WorkerSourceTaskTest.java | 3 +-
.../kafka/connect/runtime/WorkerTaskTest.java | 3 +-
.../apache/kafka/connect/runtime/WorkerTest.java | 215 ++++++++++++++-------
.../kafka/connect/runtime/WorkerTestUtils.java | 4 +-
.../connect/runtime/isolation/PluginsTest.java | 10 +-
.../connect/runtime/isolation/TestPlugins.java | 7 +
.../kafka/connect/util/TopicCreationTest.java | 4 +-
23 files changed, 407 insertions(+), 216 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index d1675d5455a..683eb3abed0 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -43,6 +43,7 @@ import
org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
+import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
@@ -70,6 +71,7 @@ import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.function.Supplier;
import static
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
@@ -233,11 +235,12 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask<SourceRecord,
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator,
StatusBackingStore statusBackingStore,
Executor closeExecutor,
-
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier) {
+
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
+ Function<ClassLoader, LoaderSwap>
pluginLoaderSwapper) {
super(id, statusListener, initialState, loader, connectMetrics,
errorMetrics,
retryWithToleranceOperator, transformationChain,
errorReportersSupplier,
- time, statusBackingStore);
+ time, statusBackingStore, pluginLoaderSwapper);
this.workerConfig = workerConfig;
this.task = task;
@@ -491,11 +494,17 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask<SourceRecord,
RecordHeaders headers = retryWithToleranceOperator.execute(context, ()
-> convertHeaderFor(record), Stage.HEADER_CONVERTER,
headerConverterPlugin.get().getClass());
- byte[] key = retryWithToleranceOperator.execute(context, () ->
keyConverterPlugin.get().fromConnectData(record.topic(), headers,
record.keySchema(), record.key()),
- Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass());
+ byte[] key = retryWithToleranceOperator.execute(context, () -> {
+ try (LoaderSwap swap =
pluginLoaderSwapper.apply(keyConverterPlugin.get().getClass().getClassLoader()))
{
+ return
keyConverterPlugin.get().fromConnectData(record.topic(), headers,
record.keySchema(), record.key());
+ }
+ }, Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass());
- byte[] value = retryWithToleranceOperator.execute(context, () ->
valueConverterPlugin.get().fromConnectData(record.topic(), headers,
record.valueSchema(), record.value()),
- Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass());
+ byte[] value = retryWithToleranceOperator.execute(context, () -> {
+ try (LoaderSwap swap =
pluginLoaderSwapper.apply(valueConverterPlugin.get().getClass().getClassLoader()))
{
+ return
valueConverterPlugin.get().fromConnectData(record.topic(), headers,
record.valueSchema(), record.value());
+ }
+ }, Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass());
if (context.failed()) {
return null;
@@ -551,8 +560,11 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask<SourceRecord,
String topic = record.topic();
for (Header header : headers) {
String key = header.key();
- byte[] rawHeader =
headerConverterPlugin.get().fromConnectHeader(topic, key, header.schema(),
header.value());
- result.add(key, rawHeader);
+ try (LoaderSwap swap =
pluginLoaderSwapper.apply(headerConverterPlugin.get().getClass().getClassLoader()))
{
+ byte[] rawHeader =
headerConverterPlugin.get().fromConnectHeader(topic, key, header.schema(),
header.value());
+ result.add(key, rawHeader);
+ }
+
}
}
return result;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 9ba82bdf617..db485146811 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -362,7 +362,7 @@ public class ConnectorConfig extends AbstractConfig {
* {@link Transformation transformations} and {@link Predicate predicates}
* as they are specified in the {@link #TRANSFORMS_CONFIG} and {@link
#PREDICATES_CONFIG}
*/
- public <R extends ConnectRecord<R>> List<TransformationStage<R>>
transformationStages(ConnectorTaskId connectorTaskId, ConnectMetrics metrics) {
+ public <R extends ConnectRecord<R>> List<TransformationStage<R>>
transformationStages(Plugins plugins, ConnectorTaskId connectorTaskId,
ConnectMetrics metrics) {
final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
final List<TransformationStage<R>> transformations = new
ArrayList<>(transformAliases.size());
@@ -370,8 +370,9 @@ public class ConnectorConfig extends AbstractConfig {
final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
try {
- @SuppressWarnings("unchecked")
- final Transformation<R> transformation =
Utils.newInstance(getClass(prefix + "type"), Transformation.class);
+ final String typeConfig = prefix + "type";
+ final String versionConfig = prefix +
WorkerConfig.PLUGIN_VERSION_SUFFIX;
+ @SuppressWarnings("unchecked") final Transformation<R>
transformation = getTransformationOrPredicate(plugins, typeConfig,
versionConfig);
Map<String, Object> configs = originalsWithPrefix(prefix);
Object predicateAlias =
configs.remove(TransformationStage.PREDICATE_CONFIG);
Object negate =
configs.remove(TransformationStage.NEGATE_CONFIG);
@@ -379,13 +380,15 @@ public class ConnectorConfig extends AbstractConfig {
Plugin<Transformation<R>> transformationPlugin =
metrics.wrap(transformation, connectorTaskId, alias);
if (predicateAlias != null) {
String predicatePrefix = PREDICATES_PREFIX +
predicateAlias + ".";
+ final String predicateTypeConfig = predicatePrefix +
"type";
+ final String predicateVersionConfig = predicatePrefix +
WorkerConfig.PLUGIN_VERSION_SUFFIX;
@SuppressWarnings("unchecked")
- Predicate<R> predicate =
Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
+ Predicate<R> predicate =
getTransformationOrPredicate(plugins, predicateTypeConfig,
predicateVersionConfig);
predicate.configure(originalsWithPrefix(predicatePrefix));
Plugin<Predicate<R>> predicatePlugin =
metrics.wrap(predicate, connectorTaskId, (String) predicateAlias);
- transformations.add(new
TransformationStage<>(predicatePlugin, negate != null &&
Boolean.parseBoolean(negate.toString()), transformationPlugin));
+ transformations.add(new
TransformationStage<>(predicatePlugin, negate != null &&
Boolean.parseBoolean(negate.toString()), transformationPlugin,
plugins.safeLoaderSwapper()));
} else {
- transformations.add(new
TransformationStage<>(transformationPlugin));
+ transformations.add(new
TransformationStage<>(transformationPlugin, plugins.safeLoaderSwapper()));
}
} catch (Exception e) {
throw new ConnectException(e);
@@ -395,6 +398,17 @@ public class ConnectorConfig extends AbstractConfig {
return transformations;
}
+ @SuppressWarnings("unchecked")
+ private <T> T getTransformationOrPredicate(Plugins plugins, String
classConfig, String versionConfig) {
+ try {
+ VersionRange range =
PluginUtils.connectorVersionRequirement(getString(versionConfig));
+ VersionRange connectorRange =
PluginUtils.connectorVersionRequirement(getString(CONNECTOR_VERSION));
+ return (T) plugins.newPlugin(getClass(classConfig).getName(),
range, plugins.pluginLoader(getString(CONNECTOR_CLASS_CONFIG), connectorRange));
+ } catch (Exception e) {
+ throw new ConnectException(e);
+ }
+ }
+
/**
* Returns an enriched {@link ConfigDef} building upon the {@code
ConfigDef}, using the current configuration specified in {@code props} as an
input.
* <p>
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 706a3e4b9d5..fafbdbbc3f4 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -32,6 +32,7 @@ import
org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTask.TransactionBoundary;
@@ -57,6 +58,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.function.Supplier;
@@ -101,11 +103,12 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
Executor closeExecutor,
Runnable preProducerCheck,
Runnable postProducerCheck,
-
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier) {
+
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
+ Function<ClassLoader, LoaderSwap>
pluginLoaderSwapper) {
super(id, task, statusListener, initialState, configState,
keyConverterPlugin, valueConverterPlugin, headerConverterPlugin,
transformationChain,
buildTransactionContext(sourceConfig),
producer, admin, topicGroups, offsetReader, offsetWriter,
offsetStore, workerConfig, connectMetrics, errorMetrics,
- loader, time, retryWithToleranceOperator, statusBackingStore,
closeExecutor, errorReportersSupplier);
+ loader, time, retryWithToleranceOperator, statusBackingStore,
closeExecutor, errorReportersSupplier, pluginLoaderSwapper);
this.transactionOpen = false;
this.committableRecords = new LinkedHashMap<>();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
index 90b3559866a..a86c4878ab3 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
@@ -20,9 +20,12 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
+import java.util.function.Function;
+
/**
* Wrapper for a {@link Transformation} and corresponding optional {@link
Predicate}
* which applies the transformation when the {@link Predicate} is true (or
false, according to {@code negate}).
@@ -36,15 +39,18 @@ public class TransformationStage<R extends
ConnectRecord<R>> implements AutoClos
private final Plugin<Predicate<R>> predicatePlugin;
private final Plugin<Transformation<R>> transformationPlugin;
private final boolean negate;
+ private final Function<ClassLoader, LoaderSwap> pluginLoaderSwapper;
+
- TransformationStage(Plugin<Transformation<R>> transformationPlugin) {
- this(null, false, transformationPlugin);
+ TransformationStage(Plugin<Transformation<R>> transformationPlugin,
Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
+ this(null, false, transformationPlugin, pluginLoaderSwapper);
}
- TransformationStage(Plugin<Predicate<R>> predicatePlugin, boolean negate,
Plugin<Transformation<R>> transformationPlugin) {
+ TransformationStage(Plugin<Predicate<R>> predicatePlugin, boolean negate,
Plugin<Transformation<R>> transformationPlugin, Function<ClassLoader,
LoaderSwap> pluginLoaderSwapper) {
this.predicatePlugin = predicatePlugin;
this.negate = negate;
this.transformationPlugin = transformationPlugin;
+ this.pluginLoaderSwapper = pluginLoaderSwapper;
}
public Class<? extends Transformation<R>> transformClass() {
@@ -54,8 +60,17 @@ public class TransformationStage<R extends ConnectRecord<R>>
implements AutoClos
}
public R apply(R record) {
- if (predicatePlugin == null || predicatePlugin.get() == null || negate
^ predicatePlugin.get().test(record)) {
- return transformationPlugin.get().apply(record);
+ Predicate<R> predicate = predicatePlugin != null ?
predicatePlugin.get() : null;
+ boolean shouldTransform = predicate == null;
+ if (predicate != null) {
+ try (LoaderSwap swap =
pluginLoaderSwapper.apply(predicate.getClass().getClassLoader())) {
+ shouldTransform = negate ^ predicate.test(record);
+ }
+ }
+ if (shouldTransform) {
+ try (LoaderSwap swap =
pluginLoaderSwapper.apply(transformationPlugin.get().getClass().getClassLoader()))
{
+ record = transformationPlugin.get().apply(record);
+ }
}
return record;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index e01a1adef45..a435d281a7c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -67,8 +67,10 @@ import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
+import org.apache.kafka.connect.runtime.isolation.PluginUtils;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
+import
org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
@@ -99,6 +101,7 @@ import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
+import
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -314,32 +317,38 @@ public final class Worker {
final WorkerConnector workerConnector;
final String connClass =
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
- ClassLoader connectorLoader = plugins.connectorLoader(connClass);
- try (LoaderSwap loaderSwap =
plugins.withClassLoader(connectorLoader)) {
- log.info("Creating connector {} of type {}", connName,
connClass);
- final Connector connector = plugins.newConnector(connClass);
- final ConnectorConfig connConfig;
- final CloseableOffsetStorageReader offsetReader;
- final ConnectorOffsetBackingStore offsetStore;
- if (ConnectUtils.isSinkConnector(connector)) {
- connConfig = new SinkConnectorConfig(plugins, connProps);
- offsetReader = null;
- offsetStore = null;
- } else {
- SourceConnectorConfig sourceConfig = new
SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
- connConfig = sourceConfig;
+ final ClassLoader connectorLoader;
- // Set up the offset backing store for this connector
instance
- offsetStore = config.exactlyOnceSourceEnabled()
+ try {
+ connectorLoader = connectorClassLoader(connProps);
+ try (LoaderSwap loaderSwap =
plugins.withClassLoader(connectorLoader)) {
+ log.info("Creating connector {} of type {}", connName,
connClass);
+ final Connector connector =
instantiateConnector(connProps);
+
+ final ConnectorConfig connConfig;
+ final CloseableOffsetStorageReader offsetReader;
+ final ConnectorOffsetBackingStore offsetStore;
+
+ if (ConnectUtils.isSinkConnector(connector)) {
+ connConfig = new SinkConnectorConfig(plugins,
connProps);
+ offsetReader = null;
+ offsetStore = null;
+ } else {
+ SourceConnectorConfig sourceConfig = new
SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
+ connConfig = sourceConfig;
+
+ // Set up the offset backing store for this connector
instance
+ offsetStore = config.exactlyOnceSourceEnabled()
?
offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector,
null)
:
offsetStoreForRegularSourceConnector(sourceConfig, connName, connector, null);
- offsetStore.configure(config);
- offsetReader = new OffsetStorageReaderImpl(offsetStore,
connName, internalKeyConverter, internalValueConverter);
- }
- workerConnector = new WorkerConnector(
+ offsetStore.configure(config);
+ offsetReader = new
OffsetStorageReaderImpl(offsetStore, connName, internalKeyConverter,
internalValueConverter);
+ }
+ workerConnector = new WorkerConnector(
connName, connector, connConfig, ctx, metrics,
connectorStatusListener, offsetReader, offsetStore, connectorLoader);
- log.info("Instantiated connector {} with version {} of type
{}", connName, connector.version(), connector.getClass());
- workerConnector.transitionTo(initialState,
onConnectorStateChange);
+ log.info("Instantiated connector {} with version {} of
type {}", connName, connector.version(), connector.getClass());
+ workerConnector.transitionTo(initialState,
onConnectorStateChange);
+ }
} catch (Throwable t) {
log.error("Failed to start connector {}", connName, t);
connectorStatusListener.onFailure(connName, t);
@@ -655,51 +664,51 @@ public final class Worker {
throw new ConnectException("Task already exists in this
worker: " + id);
connectorStatusMetricsGroup.recordTaskAdded(id);
- String connType =
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
- ClassLoader connectorLoader = plugins.connectorLoader(connType);
-
- try (LoaderSwap loaderSwap =
plugins.withClassLoader(connectorLoader)) {
- final ConnectorConfig connConfig = new
ConnectorConfig(plugins, connProps);
-
- int maxTasks = connConfig.tasksMax();
- int numTasks = configState.taskCount(id.connector());
- checkTasksMax(id.connector(), numTasks, maxTasks,
connConfig.enforceTasksMax());
-
- final TaskConfig taskConfig = new TaskConfig(taskProps);
- final Class<? extends Task> taskClass =
taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
- final Task task = plugins.newTask(taskClass);
- log.info("Instantiated task {} with version {} of type {}",
id, task.version(), taskClass.getName());
-
- // By maintaining connector's specific class loader for this
thread here, we first
- // search for converters within the connector dependencies.
- // If any of these aren't found, that means the connector
didn't configure specific converters,
- // so we should instantiate based upon the worker configuration
- Converter keyConverter = plugins.newConverter(connConfig,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage
-
.CURRENT_CLASSLOADER);
- Converter valueConverter = plugins.newConverter(connConfig,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.CURRENT_CLASSLOADER);
- HeaderConverter headerConverter =
plugins.newHeaderConverter(connConfig,
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
-
ClassLoaderUsage.CURRENT_CLASSLOADER);
- if (keyConverter == null) {
- keyConverter = plugins.newConverter(config,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
- log.info("Set up the key converter {} for task {} using
the worker config", keyConverter.getClass(), id);
- } else {
- log.info("Set up the key converter {} for task {} using
the connector config", keyConverter.getClass(), id);
- }
- if (valueConverter == null) {
- valueConverter = plugins.newConverter(config,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
- log.info("Set up the value converter {} for task {} using
the worker config", valueConverter.getClass(), id);
- } else {
- log.info("Set up the value converter {} for task {} using
the connector config", valueConverter.getClass(), id);
- }
- if (headerConverter == null) {
- headerConverter = plugins.newHeaderConverter(config,
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage
-
.PLUGINS);
- log.info("Set up the header converter {} for task {} using
the worker config", headerConverter.getClass(), id);
- } else {
- log.info("Set up the header converter {} for task {} using
the connector config", headerConverter.getClass(), id);
- }
- workerTask = taskBuilder
+ final ClassLoader connectorLoader;
+ try {
+ connectorLoader = connectorClassLoader(connProps);
+ try (LoaderSwap loaderSwap =
plugins.withClassLoader(connectorLoader)) {
+ final ConnectorConfig connConfig = new
ConnectorConfig(plugins, connProps);
+
+ int maxTasks = connConfig.tasksMax();
+ int numTasks = configState.taskCount(id.connector());
+ checkTasksMax(id.connector(), numTasks, maxTasks,
connConfig.enforceTasksMax());
+
+ final TaskConfig taskConfig = new TaskConfig(taskProps);
+ final Class<? extends Task> taskClass =
taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
+ final Task task = plugins.newTask(taskClass);
+ log.info("Instantiated task {} with version {} of type
{}", id, task.version(), taskClass.getName());
+
+
+ // By maintaining connector's specific class loader for
this thread here, we first
+ // search for converters within the connector dependencies.
+ // If any of these aren't found, that means the connector
didn't configure specific converters,
+ // so we should instantiate based upon the worker
configuration
+ Converter keyConverter = plugins.newConverter(connConfig,
ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+ Converter valueConverter =
plugins.newConverter(connConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+ HeaderConverter headerConverter =
plugins.newHeaderConverter(connConfig,
ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG,
ConnectorConfig.HEADER_CONVERTER_VERSION_CONFIG);
+
+ if (keyConverter == null) {
+ keyConverter = plugins.newConverter(config,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION);
+ log.info("Set up the key converter {} for task {}
using the worker config", keyConverter.getClass(), id);
+ } else {
+ log.info("Set up the key converter {} for task {}
using the connector config", keyConverter.getClass(), id);
+ }
+ if (valueConverter == null) {
+ valueConverter = plugins.newConverter(config,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
WorkerConfig.VALUE_CONVERTER_VERSION);
+ log.info("Set up the value converter {} for task {}
using the worker config", valueConverter.getClass(), id);
+ } else {
+ log.info("Set up the value converter {} for task {}
using the connector config", valueConverter.getClass(), id);
+ }
+ if (headerConverter == null) {
+ headerConverter = plugins.newHeaderConverter(config,
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
WorkerConfig.HEADER_CONVERTER_VERSION);
+ log.info("Set up the header converter {} for task {}
using the worker config", headerConverter.getClass(), id);
+ } else {
+ log.info("Set up the header converter {} for task {}
using the connector config", headerConverter.getClass(), id);
+ }
+
+ workerTask = taskBuilder
.withTask(task)
.withConnectorConfig(connConfig)
.withKeyConverterPlugin(metrics.wrap(keyConverter, id,
true))
@@ -708,7 +717,8 @@ public final class Worker {
.withClassloader(connectorLoader)
.build();
- workerTask.initialize(taskConfig);
+ workerTask.initialize(taskConfig);
+ }
} catch (Throwable t) {
log.error("Failed to start task {}", id, t);
connectorStatusMetricsGroup.recordTaskRemoved(id);
@@ -739,19 +749,17 @@ public final class Worker {
public KafkaFuture<Void> fenceZombies(String connName, int numTasks,
Map<String, String> connProps) {
log.debug("Fencing out {} task producers for source connector {}",
numTasks, connName);
try (LoggingContext loggingContext =
LoggingContext.forConnector(connName)) {
- String connType =
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
- ClassLoader connectorLoader = plugins.connectorLoader(connType);
- try (LoaderSwap loaderSwap =
plugins.withClassLoader(connectorLoader)) {
+ Class<? extends Connector> connectorClass =
connectorClass(connProps);
+ ClassLoader classLoader = connectorClassLoader(connProps);
+ try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader))
{
final SourceConnectorConfig connConfig = new
SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
- final Class<? extends Connector> connClass =
plugins.connectorClass(
-
connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
Map<String, Object> adminConfig = adminConfigs(
connName,
"connector-worker-adminclient-" + connName,
config,
connConfig,
- connClass,
+ connectorClass,
connectorClientConfigOverridePolicy,
kafkaClusterId,
ConnectorType.SOURCE);
@@ -1198,11 +1206,9 @@ public final class Worker {
* @param cb callback to invoke upon completion of the request
*/
public void connectorOffsets(String connName, Map<String, String>
connectorConfig, Callback<ConnectorOffsets> cb) {
- String connectorClassOrAlias =
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
- ClassLoader connectorLoader =
plugins.connectorLoader(connectorClassOrAlias);
-
+ Connector connector = instantiateConnector(connectorConfig);
+ ClassLoader connectorLoader = connectorClassLoader(connectorConfig);
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader))
{
- Connector connector = plugins.newConnector(connectorClassOrAlias);
if (ConnectUtils.isSinkConnector(connector)) {
log.debug("Fetching offsets for sink connector: {}", connName);
sinkConnectorOffsets(connName, connector, connectorConfig, cb);
@@ -1213,6 +1219,43 @@ public final class Worker {
}
}
+ private Connector instantiateConnector(Map<String, String> connProps)
throws ConnectException {
+
+ final String klass =
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ final String version =
connProps.get(ConnectorConfig.CONNECTOR_VERSION);
+
+ try {
+ return plugins.newConnector(klass,
PluginUtils.connectorVersionRequirement(version));
+ } catch (InvalidVersionSpecificationException |
VersionedPluginLoadingException e) {
+ throw new ConnectException(
+ String.format("Failed to instantiate class for connector
%s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e);
+ }
+ }
+
+ private ClassLoader connectorClassLoader(Map<String, String> connProps)
throws ConnectException {
+ final String klass =
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ final String version =
connProps.get(ConnectorConfig.CONNECTOR_VERSION);
+
+ try {
+ return plugins.pluginLoader(klass,
PluginUtils.connectorVersionRequirement(version));
+ } catch (InvalidVersionSpecificationException |
VersionedPluginLoadingException e) {
+ throw new ConnectException(
+ String.format("Failed to get class loader for connector
%s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e);
+ }
+ }
+
+ private Class<? extends Connector> connectorClass(Map<String, String>
connProps) throws ConnectException {
+ final String klass =
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ final String version =
connProps.get(ConnectorConfig.CONNECTOR_VERSION);
+
+ try {
+ return plugins.connectorClass(klass,
PluginUtils.connectorVersionRequirement(version));
+ } catch (InvalidVersionSpecificationException |
VersionedPluginLoadingException e) {
+ throw new ConnectException(
+ String.format("Failed to get class for connector %s, class
%s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e);
+ }
+ }
+
/**
* Get the current consumer group offsets for a sink connector.
* <p>
@@ -1311,12 +1354,10 @@ public final class Worker {
*/
public void modifyConnectorOffsets(String connName, Map<String, String>
connectorConfig,
Map<Map<String, ?>, Map<String, ?>>
offsets, Callback<Message> cb) {
- String connectorClassOrAlias =
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
- ClassLoader connectorLoader =
plugins.connectorLoader(connectorClassOrAlias);
- Connector connector;
+ final Connector connector = instantiateConnector(connectorConfig);
+ ClassLoader connectorLoader = connectorClassLoader(connectorConfig);
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader))
{
- connector = plugins.newConnector(connectorClassOrAlias);
if (ConnectUtils.isSinkConnector(connector)) {
log.debug("Modifying offsets for sink connector: {}",
connName);
modifySinkConnectorOffsets(connName, connector,
connectorConfig, offsets, connectorLoader, cb);
@@ -1791,13 +1832,12 @@ public final class Worker {
Objects.requireNonNull(classLoader, "Classloader used by task
cannot be null");
ErrorHandlingMetrics errorHandlingMetrics =
errorHandlingMetrics(id);
- final Class<? extends Connector> connectorClass =
plugins.connectorClass(
-
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+ final Class<? extends Connector> connectorClass =
connectorClass(connectorConfig.originalsStrings());
RetryWithToleranceOperator<T> retryWithToleranceOperator = new
RetryWithToleranceOperator<>(connectorConfig.errorRetryTimeout(),
connectorConfig.errorMaxDelayInMillis(),
connectorConfig.errorToleranceType(), Time.SYSTEM, errorHandlingMetrics);
- TransformationChain<T, R> transformationChain = new
TransformationChain<>(connectorConfig.<R>transformationStages(id, metrics),
retryWithToleranceOperator);
+ TransformationChain<T, R> transformationChain = new
TransformationChain<>(connectorConfig.<R>transformationStages(plugins, id,
metrics), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
return doBuild(task, id, configState, statusListener, initialState,
@@ -1862,7 +1902,7 @@ public final class Worker {
return new WorkerSinkTask(id, (SinkTask) task, statusListener,
initialState, config, configState, metrics, keyConverterPlugin,
valueConverterPlugin, errorHandlingMetrics,
headerConverterPlugin, transformationChain, consumer, classLoader, time,
retryWithToleranceOperator, workerErrantRecordReporter,
herder.statusBackingStore(),
- () -> sinkTaskReporters(id, sinkConfig,
errorHandlingMetrics, connectorClass));
+ () -> sinkTaskReporters(id, sinkConfig,
errorHandlingMetrics, connectorClass), plugins.safeLoaderSwapper());
}
}
@@ -1922,7 +1962,7 @@ public final class Worker {
return new WorkerSourceTask(id, (SourceTask) task, statusListener,
initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics,
headerConverterPlugin, transformationChain, producer,
topicAdmin, topicCreationGroups,
offsetReader, offsetWriter, offsetStore, config,
configState, metrics, classLoader, time,
- retryWithToleranceOperator, herder.statusBackingStore(),
executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
+ retryWithToleranceOperator, herder.statusBackingStore(),
executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics),
plugins.safeLoaderSwapper());
}
}
@@ -1987,7 +2027,7 @@ public final class Worker {
headerConverterPlugin, transformationChain, producer,
topicAdmin, topicCreationGroups,
offsetReader, offsetWriter, offsetStore, config,
configState, metrics, errorHandlingMetrics, classLoader, time,
retryWithToleranceOperator,
herder.statusBackingStore(), sourceConfig, executor,
preProducerCheck, postProducerCheck,
- () -> sourceTaskReporters(id, sourceConfig,
errorHandlingMetrics));
+ () -> sourceTaskReporters(id, sourceConfig,
errorHandlingMetrics), plugins.safeLoaderSwapper());
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index c050c61fb5f..4b8256115ed 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -47,6 +47,7 @@ import
org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
+import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.ClusterConfigState;
@@ -66,6 +67,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -122,9 +124,10 @@ class WorkerSinkTask extends
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
RetryWithToleranceOperator<ConsumerRecord<byte[],
byte[]>> retryWithToleranceOperator,
WorkerErrantRecordReporter
workerErrantRecordReporter,
StatusBackingStore statusBackingStore,
- Supplier<List<ErrorReporter<ConsumerRecord<byte[],
byte[]>>>> errorReportersSupplier) {
+ Supplier<List<ErrorReporter<ConsumerRecord<byte[],
byte[]>>>> errorReportersSupplier,
+ Function<ClassLoader, LoaderSwap>
pluginLoaderSwapper) {
super(id, statusListener, initialState, loader, connectMetrics,
errorMetrics,
- retryWithToleranceOperator, transformationChain,
errorReportersSupplier, time, statusBackingStore);
+ retryWithToleranceOperator, transformationChain,
errorReportersSupplier, time, statusBackingStore, pluginLoaderSwapper);
this.workerConfig = workerConfig;
this.task = task;
@@ -539,11 +542,17 @@ class WorkerSinkTask extends
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
}
private SinkRecord
convertAndTransformRecord(ProcessingContext<ConsumerRecord<byte[], byte[]>>
context, final ConsumerRecord<byte[], byte[]> msg) {
- SchemaAndValue keyAndSchema =
retryWithToleranceOperator.execute(context, () ->
keyConverterPlugin.get().toConnectData(msg.topic(), msg.headers(), msg.key()),
- Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass());
+ SchemaAndValue keyAndSchema =
retryWithToleranceOperator.execute(context, () -> {
+ try (LoaderSwap swap =
pluginLoaderSwapper.apply(keyConverterPlugin.get().getClass().getClassLoader()))
{
+ return keyConverterPlugin.get().toConnectData(msg.topic(),
msg.headers(), msg.key());
+ }
+ }, Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass());
- SchemaAndValue valueAndSchema =
retryWithToleranceOperator.execute(context, () ->
valueConverterPlugin.get().toConnectData(msg.topic(), msg.headers(),
msg.value()),
- Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass());
+ SchemaAndValue valueAndSchema =
retryWithToleranceOperator.execute(context, () -> {
+ try (LoaderSwap swap =
pluginLoaderSwapper.apply(valueConverterPlugin.get().getClass().getClassLoader()))
{
+ return valueConverterPlugin.get().toConnectData(msg.topic(),
msg.headers(), msg.value());
+ }
+ }, Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass());
Headers headers = retryWithToleranceOperator.execute(context, () ->
convertHeadersFor(msg), Stage.HEADER_CONVERTER,
headerConverterPlugin.get().getClass());
@@ -580,8 +589,10 @@ class WorkerSinkTask extends
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
if (recordHeaders != null) {
String topic = record.topic();
for (org.apache.kafka.common.header.Header recordHeader :
recordHeaders) {
- SchemaAndValue schemaAndValue =
headerConverterPlugin.get().toConnectHeader(topic, recordHeader.key(),
recordHeader.value());
- result.add(recordHeader.key(), schemaAndValue);
+ try (LoaderSwap swap =
pluginLoaderSwapper.apply(headerConverterPlugin.get().getClass().getClassLoader()))
{
+ SchemaAndValue schemaAndValue =
headerConverterPlugin.get().toConnectHeader(topic, recordHeader.key(),
recordHeader.value());
+ result.add(recordHeader.key(), schemaAndValue);
+ }
}
}
return result;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index a92ebe0fc39..0806e887735 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -28,6 +28,7 @@ import
org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
+import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
@@ -53,6 +54,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.function.Supplier;
import static
org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;
@@ -91,12 +93,13 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
RetryWithToleranceOperator<SourceRecord>
retryWithToleranceOperator,
StatusBackingStore statusBackingStore,
Executor closeExecutor,
- Supplier<List<ErrorReporter<SourceRecord>>>
errorReportersSupplier) {
+ Supplier<List<ErrorReporter<SourceRecord>>>
errorReportersSupplier,
+ Function<ClassLoader, LoaderSwap>
pluginLoaderSwapper) {
super(id, task, statusListener, initialState, configState,
keyConverterPlugin, valueConverterPlugin, headerConverterPlugin,
transformationChain,
null, producer,
admin, topicGroups, offsetReader, offsetWriter, offsetStore,
workerConfig, connectMetrics, errorMetrics, loader,
- time, retryWithToleranceOperator, statusBackingStore,
closeExecutor, errorReportersSupplier);
+ time, retryWithToleranceOperator, statusBackingStore,
closeExecutor, errorReportersSupplier, pluginLoaderSwapper);
this.committableOffsets = CommittableOffsets.EMPTY;
this.submittedRecords = new SubmittedRecords();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ff0ecfce276..fa28a4e7b0e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -33,6 +33,7 @@ import
org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
@@ -44,6 +45,7 @@ import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.function.Supplier;
/**
@@ -78,6 +80,7 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>>
implements Runnable {
protected final RetryWithToleranceOperator<T> retryWithToleranceOperator;
protected final TransformationChain<T, R> transformationChain;
private final Supplier<List<ErrorReporter<T>>> errorReportersSupplier;
+ protected final Function<ClassLoader, LoaderSwap> pluginLoaderSwapper;
protected final PluginMetricsImpl pluginMetrics;
public WorkerTask(ConnectorTaskId id,
@@ -90,7 +93,8 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>>
implements Runnable {
TransformationChain<T, R> transformationChain,
Supplier<List<ErrorReporter<T>>> errorReportersSupplier,
Time time,
- StatusBackingStore statusBackingStore) {
+ StatusBackingStore statusBackingStore,
+ Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
this.id = id;
this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics,
statusListener);
this.errorMetrics = errorMetrics;
@@ -106,6 +110,7 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>>
implements Runnable {
this.errorReportersSupplier = errorReportersSupplier;
this.time = time;
this.statusBackingStore = statusBackingStore;
+ this.pluginLoaderSwapper = pluginLoaderSwapper;
this.pluginMetrics = connectMetrics.taskPluginMetrics(id);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 8be45e773b3..98f33ea582b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -549,7 +549,7 @@ public class Plugins {
}
private HeaderConverter newHeaderConverter(AbstractConfig config, String
classPropertyName, String versionPropertyName, ClassLoaderUsage
classLoaderUsage) {
- if (!config.originals().containsKey(classPropertyName) &&
classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
+ if (config.getClass(classPropertyName) == null && classLoaderUsage ==
ClassLoaderUsage.CURRENT_CLASSLOADER) {
// This configuration does not define the Header Converter via the
specified property name
return null;
}
@@ -602,7 +602,7 @@ public class Plugins {
// if the config specifies the class name, use it, otherwise
use the default which we can get from config.getClass
String classOrAlias =
config.originalsStrings().get(classPropertyName);
if (classOrAlias == null) {
- classOrAlias =
config.getClass(classPropertyName).getName();
+ classOrAlias = config.getClass(classPropertyName) == null
? null : config.getClass(classPropertyName).getName();
}
try {
klass = pluginClass(delegatingLoader, classOrAlias,
basePluginClass, range);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
index 780cd19d093..704a1f1ff40 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
@@ -43,6 +43,7 @@ import
org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -966,7 +967,7 @@ public class AbstractWorkerSourceTaskTest {
taskId, sourceTask, statusListener, TargetState.STARTED,
configState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin,
transformationChain,
workerTransactionContext, producer, admin,
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter,
offsetStore,
config, metrics, errorHandlingMetrics,
plugins.delegatingLoader(), Time.SYSTEM, retryWithToleranceOperator,
- statusBackingStore, Runnable::run, errorReportersSupplier) {
+ statusBackingStore, Runnable::run, errorReportersSupplier,
TestPlugins.noOpLoaderSwap()) {
@Override
protected void prepareToInitializeTask() {
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index 2c01c4b4fbc..5253bcb47da 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -163,7 +163,7 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "42");
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS,
props);
- final List<TransformationStage<SinkRecord>> transformationStages =
config.transformationStages(CONNECTOR_TASK_ID, METRICS);
+ final List<TransformationStage<SinkRecord>> transformationStages =
config.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
assertEquals(1, transformationStages.size());
final TransformationStage<SinkRecord> stage =
transformationStages.get(0);
assertEquals(SimpleTransformation.class, stage.transformClass());
@@ -192,7 +192,7 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
props.put("transforms.b.type", SimpleTransformation.class.getName());
props.put("transforms.b.magic.number", "84");
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS,
props);
- final List<TransformationStage<SinkRecord>> transformationStages =
config.transformationStages(CONNECTOR_TASK_ID, METRICS);
+ final List<TransformationStage<SinkRecord>> transformationStages =
config.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
assertEquals(2, transformationStages.size());
assertEquals(42,
transformationStages.get(0).apply(DUMMY_RECORD).kafkaPartition().intValue());
assertEquals(84,
transformationStages.get(1).apply(DUMMY_RECORD).kafkaPartition().intValue());
@@ -293,7 +293,7 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
private void assertTransformationStageWithPredicate(Map<String, String>
props, boolean expectedNegated) {
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS,
props);
- final List<TransformationStage<SinkRecord>> transformationStages =
config.transformationStages(CONNECTOR_TASK_ID, METRICS);
+ final List<TransformationStage<SinkRecord>> transformationStages =
config.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
assertEquals(1, transformationStages.size());
TransformationStage<SinkRecord> stage = transformationStages.get(0);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 035b30216da..70edfb0f598 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -427,7 +428,7 @@ public class ErrorHandlingTaskTest {
Plugin<Transformation<SinkRecord>> transformationPlugin =
metrics.wrap(new FaultyPassthrough<SinkRecord>(), taskId, "");
TransformationChain<ConsumerRecord<byte[], byte[]>, SinkRecord>
sinkTransforms =
- new TransformationChain<>(singletonList(new
TransformationStage<>(transformationPlugin)), retryWithToleranceOperator);
+ new TransformationChain<>(singletonList(new
TransformationStage<>(transformationPlugin, TestPlugins.noOpLoaderSwap())),
retryWithToleranceOperator);
Plugin<Converter> keyConverterPlugin = metrics.wrap(converter, taskId,
true);
Plugin<Converter> valueConverterPlugin = metrics.wrap(converter,
taskId, false);
@@ -437,7 +438,7 @@ public class ErrorHandlingTaskTest {
ClusterConfigState.EMPTY, metrics, keyConverterPlugin,
valueConverterPlugin, errorHandlingMetrics,
headerConverterPlugin, sinkTransforms, consumer, pluginLoader,
time,
retryWithToleranceOperator, workerErrantRecordReporter,
- statusBackingStore, () -> errorReporters);
+ statusBackingStore, () -> errorReporters,
TestPlugins.noOpLoaderSwap());
}
private void createSourceTask(TargetState initialState,
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator,
List<ErrorReporter<SourceRecord>> errorReporters) {
@@ -463,7 +464,7 @@ public class ErrorHandlingTaskTest {
List<ErrorReporter<SourceRecord>>
errorReporters, Converter converter) {
Plugin<Transformation<SourceRecord>> transformationPlugin =
metrics.wrap(new FaultyPassthrough<SourceRecord>(), taskId, "");
TransformationChain<SourceRecord, SourceRecord> sourceTransforms = new
TransformationChain<>(singletonList(
- new TransformationStage<>(transformationPlugin)),
retryWithToleranceOperator);
+ new TransformationStage<>(transformationPlugin,
TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator);
Plugin<Converter> keyConverterPlugin = metrics.wrap(converter, taskId,
true);
Plugin<Converter> valueConverterPlugin = metrics.wrap(converter,
taskId, false);
@@ -476,7 +477,7 @@ public class ErrorHandlingTaskTest {
offsetReader, offsetWriter, offsetStore, workerConfig,
ClusterConfigState.EMPTY, metrics, pluginLoader, time,
retryWithToleranceOperator,
- statusBackingStore, Runnable::run, () -> errorReporters));
+ statusBackingStore, Runnable::run, () -> errorReporters,
TestPlugins.noOpLoaderSwap()));
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 24103aefd97..a6375398d29 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -36,6 +36,7 @@ import
org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -283,7 +284,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask,
statusListener, initialState, keyConverterPlugin, valueConverterPlugin,
headerConverterPlugin,
transformationChain, producer, admin,
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter,
offsetStore,
config, clusterConfigState, metrics, errorHandlingMetrics,
plugins.delegatingLoader(), time,
RetryWithToleranceOperatorTest.noneOperator(), statusBackingStore,
- sourceConfig, Runnable::run, preProducerCheck,
postProducerCheck, Collections::emptyList);
+ sourceConfig, Runnable::run, preProducerCheck,
postProducerCheck, Collections::emptyList, TestPlugins.noOpLoaderSwap());
}
@ParameterizedTest
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
index 88ed326457a..e2791a63f7b 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
@@ -61,7 +62,9 @@ public class TransformationStageTest {
TransformationStage<SourceRecord> stage = new TransformationStage<>(
predicatePlugin,
negate,
- transformationPlugin);
+ transformationPlugin,
+ TestPlugins.noOpLoaderSwap()
+ );
assertEquals(expectedResult, stage.apply(initial));
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 0750b0855cb..2607ee8b03b 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -48,6 +48,7 @@ import
org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -228,7 +229,7 @@ public class WorkerSinkTaskTest {
taskId, task, statusListener, initialState, workerConfig,
ClusterConfigState.EMPTY, connectMetrics,
keyConverterPlugin, valueConverterPlugin, errorMetrics,
headerConverterPlugin,
transformationChain, consumer, loader, time,
- retryWithToleranceOperator, null, statusBackingStore,
errorReportersSupplier);
+ retryWithToleranceOperator, null, statusBackingStore,
errorReportersSupplier, TestPlugins.noOpLoaderSwap());
}
@AfterEach
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 15ea638ef02..74021118098 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -35,6 +35,7 @@ import
org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -182,7 +183,7 @@ public class WorkerSinkTaskThreadedTest {
taskId, sinkTask, statusListener, initialState, workerConfig,
ClusterConfigState.EMPTY, metrics, keyConverterPlugin,
valueConverterPlugin, errorHandlingMetrics,
headerConverterPlugin, transformationChain,
consumer, pluginLoader, time,
RetryWithToleranceOperatorTest.noneOperator(), null, statusBackingStore,
- Collections::emptyList);
+ Collections::emptyList, TestPlugins.noOpLoaderSwap());
recordsReturned = 0;
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 6d6ab60c4a7..23fb3618f81 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -38,6 +38,7 @@ import
org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -254,7 +255,7 @@ public class WorkerSourceTaskTest {
workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener,
initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics,
headerConverterPlugin,
transformationChain, producer, admin,
TopicCreationGroup.configuredGroups(sourceConfig),
offsetReader, offsetWriter, offsetStore, config,
clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
- retryWithToleranceOperator, statusBackingStore, Runnable::run,
Collections::emptyList);
+ retryWithToleranceOperator, statusBackingStore, Runnable::run,
Collections::emptyList, TestPlugins.noOpLoaderSwap());
}
@ParameterizedTest
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index c8c8cc49d05..eae9c96998b 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -24,6 +24,7 @@ import
org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.StatusBackingStore;
@@ -299,7 +300,7 @@ public class WorkerTaskTest {
Supplier<List<ErrorReporter<Object>>>
errorReporterSupplier,
Time time, StatusBackingStore
statusBackingStore) {
super(id, statusListener, initialState, loader, connectMetrics,
errorHandlingMetrics,
- retryWithToleranceOperator, transformationChain,
errorReporterSupplier, time, statusBackingStore);
+ retryWithToleranceOperator, transformationChain,
errorReporterSupplier, time, statusBackingStore, TestPlugins.noOpLoaderSwap());
}
@Override
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index b4ad23b37a9..2f7af629f06 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -89,6 +89,7 @@ import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.maven.artifact.versioning.VersionRange;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -322,7 +323,7 @@ public class WorkerTest {
// Create
mockKafkaClusterId();
- mockConnectorIsolation(connectorClass, sourceConnector);
+ mockVersionedConnectorIsolation(connectorClass, null, sourceConnector);
mockExecutorRealSubmit(WorkerConnector.class);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@@ -358,7 +359,7 @@ public class WorkerTest {
verifyKafkaClusterId();
- verifyConnectorIsolation(sourceConnector);
+ verifyVersionedConnectorIsolation(connectorClass, null,
sourceConnector);
verifyExecutorSubmit();
verify(sourceConnector).initialize(any(ConnectorContext.class));
verify(sourceConnector).start(connectorProps);
@@ -389,7 +390,8 @@ public class WorkerTest {
mockKafkaClusterId();
mockGenericIsolation();
- when(plugins.newConnector(anyString())).thenThrow(exception);
+ when(plugins.pluginLoader(nonConnectorClass,
null)).thenReturn(pluginLoader);
+ when(plugins.newConnector(nonConnectorClass,
null)).thenThrow(exception);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
worker.herder = herder;
@@ -414,7 +416,7 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
assertStartupStatistics(worker, 1, 1, 0, 0);
- verify(plugins).newConnector(anyString());
+ verify(plugins).newConnector(nonConnectorClass, null);
verifyKafkaClusterId();
verifyGenericIsolation();
verify(connectorStatusListener).onFailure(eq(CONNECTOR_ID),
any(ConnectException.class));
@@ -426,7 +428,7 @@ public class WorkerTest {
setup(enableTopicCreation);
final String connectorAlias = "SampleSourceConnector";
mockKafkaClusterId();
- mockConnectorIsolation(connectorAlias, sinkConnector);
+ mockVersionedConnectorIsolation(connectorAlias, null, sinkConnector);
mockExecutorRealSubmit(WorkerConnector.class);
connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorAlias);
@@ -456,7 +458,7 @@ public class WorkerTest {
assertStartupStatistics(worker, 1, 0, 0, 0);
verifyKafkaClusterId();
- verifyConnectorIsolation(sinkConnector);
+ verifyVersionedConnectorIsolation(connectorAlias, null, sinkConnector);
verifyExecutorSubmit();
verify(sinkConnector).initialize(any(ConnectorContext.class));
verify(sinkConnector).start(connectorProps);
@@ -472,7 +474,7 @@ public class WorkerTest {
final String shortConnectorAlias = "WorkerTest";
mockKafkaClusterId();
- mockConnectorIsolation(shortConnectorAlias, sinkConnector);
+ mockVersionedConnectorIsolation(shortConnectorAlias, null,
sinkConnector);
mockExecutorRealSubmit(WorkerConnector.class);
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
shortConnectorAlias);
@@ -499,7 +501,7 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
verifyKafkaClusterId();
- verifyConnectorIsolation(sinkConnector);
+ verifyVersionedConnectorIsolation(shortConnectorAlias, null,
sinkConnector);
verify(sinkConnector).initialize(any(ConnectorContext.class));
verify(sinkConnector).start(connectorProps);
verify(connectorStatusListener).onStartup(CONNECTOR_ID);
@@ -531,7 +533,7 @@ public class WorkerTest {
final String connectorClass = SampleSourceConnector.class.getName();
mockKafkaClusterId();
- mockConnectorIsolation(connectorClass, sinkConnector);
+ mockVersionedConnectorIsolation(connectorClass, null, sinkConnector);
mockExecutorRealSubmit(WorkerConnector.class);
Map<String, String> taskProps = Collections.singletonMap("foo", "bar");
@@ -584,7 +586,7 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
verifyKafkaClusterId();
- verifyConnectorIsolation(sinkConnector);
+ verifyVersionedConnectorIsolation(connectorClass, null, sinkConnector);
verifyExecutorSubmit();
verify(sinkConnector).initialize(any(ConnectorContext.class));
verify(sinkConnector).start(connectorProps);
@@ -601,10 +603,10 @@ public class WorkerTest {
public void testAddRemoveSourceTask(boolean enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
- mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class,
task);
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
- mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
taskHeaderConverter);
+ mockVersionedTaskIsolation(SampleSourceConnector.class,
TestSourceTask.class, null, sourceConnector, task);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
+ mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
Map<String, String> origProps =
Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG,
TestSourceTask.class.getName());
@@ -642,10 +644,10 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
verifyKafkaClusterId();
- verifyTaskIsolation(task);
- verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
- verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
- verifyTaskHeaderConverter();
+ verifyVersionedTaskIsolation(SampleSourceConnector.class,
TestSourceTask.class, null, task);
+
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+ verifyVersionedTaskHeaderConverterFromConnector();
verifyExecutorSubmit();
}
@@ -657,10 +659,10 @@ public class WorkerTest {
// Most of the other cases use source tasks; we make sure to get code
coverage for sink tasks here as well
SinkTask task = mock(TestSinkTask.class);
mockKafkaClusterId();
- mockTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, task);
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
- mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
taskHeaderConverter);
+ mockVersionedTaskIsolation(SampleSinkConnector.class,
TestSinkTask.class, null, sinkConnector, task);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
+ mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
Map<String, String> origProps =
Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG,
TestSinkTask.class.getName());
@@ -700,10 +702,10 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
verifyKafkaClusterId();
- verifyTaskIsolation(task);
- verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
- verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
- verifyTaskHeaderConverter();
+ verifyVersionedTaskIsolation(SampleSinkConnector.class,
TestSinkTask.class, null, task);
+
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+ verifyVersionedTaskHeaderConverterFromConnector();
verifyExecutorSubmit();
}
@@ -729,10 +731,10 @@ public class WorkerTest {
config = new DistributedConfig(workerProps);
mockKafkaClusterId();
- mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class,
task);
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
- mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
taskHeaderConverter);
+ mockVersionedTaskIsolation(SampleSourceConnector.class,
TestSourceTask.class, null, sourceConnector, task);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
+ mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
Runnable preProducer = mock(Runnable.class);
@@ -774,10 +776,10 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
verifyKafkaClusterId();
- verifyTaskIsolation(task);
- verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
- verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
- verifyTaskHeaderConverter();
+ verifyVersionedTaskIsolation(SampleSourceConnector.class,
TestSourceTask.class, null, task);
+
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+ verifyVersionedTaskHeaderConverterFromConnector();
verifyExecutorSubmit();
}
@@ -794,11 +796,10 @@ public class WorkerTest {
TaskConfig taskConfig = new TaskConfig(origProps);
mockKafkaClusterId();
- mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class,
task);
- // Expect that the worker will create converters and will find them
using the current classloader ...
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
- mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
taskHeaderConverter);
+ mockVersionedTaskIsolation(SampleSourceConnector.class,
TestSourceTask.class, null, sourceConnector, task);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
+ mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
@@ -851,7 +852,7 @@ public class WorkerTest {
verify(instantiatedTask).initialize(taskConfig);
verify(herder, times(5)).taskStatus(TASK_ID);
verifyKafkaClusterId();
- verifyTaskIsolation(task);
+ verifyVersionedTaskIsolation(SampleSourceConnector.class,
TestSourceTask.class, null, task);
verifyExecutorSubmit();
verify(instantiatedTask, atLeastOnce()).id();
verify(instantiatedTask).awaitStop(anyLong());
@@ -860,9 +861,9 @@ public class WorkerTest {
// Called when we stop the worker
verify(instantiatedTask).loader();
verify(instantiatedTask).stop();
- verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
- verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
- verifyTaskHeaderConverter();
+
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+ verifyVersionedTaskHeaderConverterFromConnector();
}
@ParameterizedTest
@@ -907,6 +908,7 @@ public class WorkerTest {
mockKafkaClusterId();
mockGenericIsolation();
+ when(plugins.pluginLoader(SampleSourceConnector.class.getName(),
null)).thenReturn(pluginLoader);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
worker.herder = herder;
@@ -935,14 +937,14 @@ public class WorkerTest {
mockFileConfigProvider();
mockKafkaClusterId();
- mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class,
task);
+ mockVersionedTaskIsolation(SampleSourceConnector.class,
TestSourceTask.class, null, sourceConnector, task);
// Expect that the worker will create converters and will not
initially find them using the current classloader ...
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, null);
- mockTaskConverter(ClassLoaderUsage.PLUGINS,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, null);
- mockTaskConverter(ClassLoaderUsage.PLUGINS,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
- mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
- mockTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, null);
+
mockVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
WorkerConfig.KEY_CONVERTER_VERSION, taskKeyConverter);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, null);
+
mockVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
WorkerConfig.VALUE_CONVERTER_VERSION, taskValueConverter);
+ mockVersionedTaskHeaderConverterFromConnector(null);
+ mockVersionedTaskHeaderConverterFromWorker(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
Map<String, String> origProps =
Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG,
TestSourceTask.class.getName());
@@ -968,7 +970,13 @@ public class WorkerTest {
verify(constructedMockTask).awaitStop(anyLong());
verify(constructedMockTask).removeMetrics();
verifyKafkaClusterId();
- verifyTaskIsolation(task);
+ verifyVersionedTaskIsolation(SampleSourceConnector.class,
TestSourceTask.class, null, task);
+
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+
verifyVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
WorkerConfig.KEY_CONVERTER_VERSION);
+
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+
verifyVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
WorkerConfig.VALUE_CONVERTER_VERSION);
+ verifyVersionedTaskHeaderConverterFromConnector();
+ verifyVersionedTaskHeaderConverterFromWorker();
verifyConverters();
verifyExecutorSubmit();
}
@@ -985,14 +993,14 @@ public class WorkerTest {
TaskConfig taskConfig = new TaskConfig(origProps);
mockKafkaClusterId();
- mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class,
task);
+ mockVersionedTaskIsolation(SampleSourceConnector.class,
TestSourceTask.class, null, sourceConnector, task);
// Expect that the worker will create converters and will not
initially find them using the current classloader ...
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, null);
- mockTaskConverter(ClassLoaderUsage.PLUGINS,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, null);
- mockTaskConverter(ClassLoaderUsage.PLUGINS,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
- mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
- mockTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, null);
+
mockVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
WorkerConfig.KEY_CONVERTER_VERSION, taskKeyConverter);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, null);
+
mockVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
WorkerConfig.VALUE_CONVERTER_VERSION, taskValueConverter);
+ mockVersionedTaskHeaderConverterFromConnector(null);
+ mockVersionedTaskHeaderConverterFromWorker(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, executorService,
@@ -1024,7 +1032,13 @@ public class WorkerTest {
verify(instantiatedTask).removeMetrics();
verifyKafkaClusterId();
- verifyTaskIsolation(task);
+ verifyVersionedTaskIsolation(SampleSourceConnector.class,
TestSourceTask.class, null, task);
+
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
+
verifyVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
WorkerConfig.KEY_CONVERTER_VERSION);
+
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
+
verifyVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
WorkerConfig.VALUE_CONVERTER_VERSION);
+ verifyVersionedTaskHeaderConverterFromConnector();
+ verifyVersionedTaskHeaderConverterFromWorker();
verifyExecutorSubmit();
verifyStorage();
}
@@ -1860,7 +1874,7 @@ public class WorkerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
public void testZombieFencing(boolean enableTopicCreation) {
setup(enableTopicCreation);
Admin admin = mock(Admin.class);
@@ -1878,6 +1892,8 @@ public class WorkerTest {
mockKafkaClusterId();
mockGenericIsolation();
+ when(plugins.connectorClass(anyString(), any())).thenReturn((Class)
sourceConnector.getClass());
+ when(plugins.pluginLoader(SampleSourceConnector.class.getName(),
null)).thenReturn(pluginLoader);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, executorService,
allConnectorClientConfigOverridePolicy, mockAdminConstructor);
@@ -2087,7 +2103,8 @@ public class WorkerTest {
worker.start();
mockGenericIsolation();
- when(plugins.newConnector(anyString())).thenReturn(sourceConnector);
+ when(plugins.newConnector(anyString(),
any())).thenReturn(sourceConnector);
+ when(plugins.pluginLoader(SampleSourceConnector.class.getName(),
null)).thenReturn(pluginLoader);
when(plugins.withClassLoader(any(ClassLoader.class),
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
when(sourceConnector.alterOffsets(eq(connectorProps),
anyMap())).thenThrow(new UnsupportedOperationException("This connector doesn't
" +
"support altering of offsets"));
@@ -2679,7 +2696,7 @@ public class WorkerTest {
String connectorClass = SampleSourceConnector.class.getName();
connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
connectorProps.put(TASKS_MAX_ENFORCE_CONFIG,
Boolean.toString(enforced));
- mockConnectorIsolation(connectorClass, sourceConnector);
+ mockVersionedConnectorIsolation(connectorClass, null, sourceConnector);
mockExecutorRealSubmit(WorkerConnector.class);
@@ -2851,10 +2868,10 @@ public class WorkerTest {
tasksMaxExceededMessage =
failureCaptor.getValue().getMessage();
} else {
- mockTaskIsolation(SampleSinkConnector.class,
TestSinkTask.class, task);
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
- mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
- mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER,
taskHeaderConverter);
+ mockVersionedTaskIsolation(SampleSinkConnector.class,
TestSinkTask.class, null, sinkConnector, task);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
+
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
+
mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
assertTrue(worker.startSinkTask(
@@ -2964,8 +2981,20 @@ public class WorkerTest {
.thenReturn(returning);
}
- private void verifyTaskConverter(String converterClassConfig) {
- verify(plugins).newConverter(any(AbstractConfig.class),
eq(converterClassConfig), eq(ClassLoaderUsage.CURRENT_CLASSLOADER));
+ private void mockVersionedTaskConverterFromConnector(String
converterClassConfig, String converterVersionConfig, Converter returning) {
+ when(plugins.newConverter(any(ConnectorConfig.class),
eq(converterClassConfig), eq(converterVersionConfig))).thenReturn(returning);
+ }
+
+ private void verifyVersionedTaskConverterFromConnector(String
converterClassConfig, String converterVersionConfig) {
+ verify(plugins).newConverter(any(ConnectorConfig.class),
eq(converterClassConfig), eq(converterVersionConfig));
+ }
+
+ private void mockVersionedTaskConverterFromWorker(String
converterClassConfig, String converterVersionConfig, Converter returning) {
+ when(plugins.newConverter(any(WorkerConfig.class),
eq(converterClassConfig), eq(converterVersionConfig))).thenReturn(returning);
+ }
+
+ private void verifyVersionedTaskConverterFromWorker(String
converterClassConfig, String converterVersionConfig) {
+ verify(plugins).newConverter(any(WorkerConfig.class),
eq(converterClassConfig), eq(converterVersionConfig));
}
private void mockTaskHeaderConverter(ClassLoaderUsage classLoaderUsage,
HeaderConverter returning) {
@@ -2977,8 +3006,25 @@ public class WorkerTest {
verify(plugins).newHeaderConverter(any(AbstractConfig.class),
eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG),
eq(ClassLoaderUsage.CURRENT_CLASSLOADER));
}
+ private void mockVersionedTaskHeaderConverterFromConnector(HeaderConverter
returning) {
+ when(plugins.newHeaderConverter(any(ConnectorConfig.class),
eq(ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG),
eq(ConnectorConfig.HEADER_CONVERTER_VERSION_CONFIG)))
+ .thenReturn(returning);
+ }
+
+ private void verifyVersionedTaskHeaderConverterFromConnector() {
+ verify(plugins).newHeaderConverter(any(ConnectorConfig.class),
eq(ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG),
eq(ConnectorConfig.HEADER_CONVERTER_VERSION_CONFIG));
+ }
+
+ private void mockVersionedTaskHeaderConverterFromWorker(HeaderConverter
returning) {
+ when(plugins.newHeaderConverter(any(WorkerConfig.class),
eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG),
eq(WorkerConfig.HEADER_CONVERTER_VERSION)))
+ .thenReturn(returning);
+ }
+
+ private void verifyVersionedTaskHeaderConverterFromWorker() {
+ verify(plugins).newHeaderConverter(any(WorkerConfig.class),
eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG),
eq(WorkerConfig.HEADER_CONVERTER_VERSION));
+ }
+
private void mockGenericIsolation() {
- when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
}
@@ -2993,12 +3039,26 @@ public class WorkerTest {
when(connector.version()).thenReturn("1.0");
}
+ private void mockVersionedConnectorIsolation(String connectorClass,
VersionRange range, Connector connector) {
+ mockGenericIsolation();
+ when(plugins.pluginLoader(connectorClass,
range)).thenReturn(pluginLoader);
+ when(plugins.newConnector(connectorClass,
range)).thenReturn(connector);
+ when(connector.version()).thenReturn(range == null ? "unknown" :
range.toString());
+ }
+
private void verifyConnectorIsolation(Connector connector) {
verifyGenericIsolation();
verify(plugins).newConnector(anyString());
verify(connector, atLeastOnce()).version();
}
+ private void verifyVersionedConnectorIsolation(String connectorClass,
VersionRange range, Connector connector) {
+ verifyGenericIsolation();
+ verify(plugins).pluginLoader(connectorClass, range);
+ verify(plugins).newConnector(connectorClass, range);
+ verify(connector, atLeastOnce()).version();
+ }
+
private void mockTaskIsolation(Class<? extends Connector> connector,
Class<? extends Task> taskClass, Task task) {
mockGenericIsolation();
doReturn(connector).when(plugins).connectorClass(connector.getName());
@@ -3006,6 +3066,15 @@ public class WorkerTest {
when(task.version()).thenReturn("1.0");
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private void mockVersionedTaskIsolation(Class<? extends Connector>
connectorClass, Class<? extends Task> taskClass, VersionRange range, Connector
connector, Task task) {
+ mockGenericIsolation();
+ when(plugins.pluginLoader(connectorClass.getName(),
range)).thenReturn(pluginLoader);
+ when(plugins.connectorClass(connectorClass.getName(),
range)).thenReturn((Class) connectorClass);
+ when(plugins.newTask(taskClass)).thenReturn(task);
+ when(task.version()).thenReturn(range == null ? "unknown" :
range.toString());
+ }
+
private void verifyTaskIsolation(Task task) {
verifyGenericIsolation();
verify(plugins).connectorClass(anyString());
@@ -3013,6 +3082,14 @@ public class WorkerTest {
verify(task).version();
}
+ private void verifyVersionedTaskIsolation(Class<? extends Connector>
connectorClass, Class<? extends Task> taskClass, VersionRange range, Task task)
{
+ verifyGenericIsolation();
+ verify(plugins).pluginLoader(connectorClass.getName(), range);
+ verify(plugins).connectorClass(connectorClass.getName(), range);
+ verify(plugins).newTask(taskClass);
+ verify(task).version();
+ }
+
private void mockExecutorRealSubmit(Class<? extends Runnable>
runnableClass) {
// This test expects the runnable to be executed, so have the isolated
runnable pass-through.
// Requires using the Worker constructor without the mocked
executorService
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
index d4cac9484e6..c91fa1017ce 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.transforms.Transformation;
@@ -195,7 +196,8 @@ public class WorkerTestUtils {
TransformationStage<R> stage = new TransformationStage<>(
predicatePlugin,
false,
- transformationPlugin);
+ transformationPlugin,
+ TestPlugins.noOpLoaderSwap());
TransformationChain<T, R> realTransformationChainRetriableException =
new TransformationChain(List.of(stage), toleranceOperator);
TransformationChain<T, R> transformationChainRetriableException =
Mockito.spy(realTransformationChainRetriableException);
return transformationChainRetriableException;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 55a3445a331..ca4c29931d0 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -69,7 +69,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -197,16 +196,9 @@ public class PluginsTest {
props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
createConfig();
- // Because it's not explicitly set on the supplied configuration, the
logic to use the current classloader for the connector
- // will exit immediately, and so this method always returns null
HeaderConverter headerConverter = plugins.newHeaderConverter(config,
-
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
-
ClassLoaderUsage.CURRENT_CLASSLOADER);
- assertNull(headerConverter);
- // But we should always find it (or the worker's default) when using
the plugins classloader ...
- headerConverter = plugins.newHeaderConverter(config,
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
- ClassLoaderUsage.PLUGINS);
+
ClassLoaderUsage.CURRENT_CLASSLOADER);
assertNotNull(headerConverter);
assertInstanceOf(SimpleHeaderConverter.class, headerConverter);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
index adb2c2418d5..0e472a3ebe0 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
@@ -39,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.jar.Attributes;
import java.util.jar.JarEntry;
@@ -374,6 +375,12 @@ public class TestPlugins {
.collect(Collectors.toList());
}
+ public static Function<ClassLoader, LoaderSwap> noOpLoaderSwap() {
+ return classLoader -> {
+ return new
LoaderSwap(Thread.currentThread().getContextClassLoader());
+ };
+ }
+
private static TestPlugin[] defaultPlugins() {
return Arrays.stream(TestPlugin.values())
.filter(TestPlugin::includeByDefault)
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
index 29a9426e337..ca358f18f43 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
@@ -519,7 +519,7 @@ public class TopicCreationTest {
topicCreation.addTopic(FOO_TOPIC);
assertFalse(topicCreation.isTopicCreationRequired(FOO_TOPIC));
- List<TransformationStage<SourceRecord>> transformationStages =
sourceConfig.transformationStages(CONNECTOR_TASK_ID, METRICS);
+ List<TransformationStage<SourceRecord>> transformationStages =
sourceConfig.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
assertEquals(1, transformationStages.size());
TransformationStage<SourceRecord> xform = transformationStages.get(0);
SourceRecord transformed = xform.apply(new SourceRecord(null, null,
"topic", 0, null, null, Schema.INT8_SCHEMA, 42));
@@ -626,7 +626,7 @@ public class TopicCreationTest {
assertEquals(barPartitions, barTopicSpec.numPartitions());
assertEquals(barTopicProps, barTopicSpec.configs());
- List<TransformationStage<SourceRecord>> transformationStages =
sourceConfig.transformationStages(CONNECTOR_TASK_ID, METRICS);
+ List<TransformationStage<SourceRecord>> transformationStages =
sourceConfig.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
assertEquals(2, transformationStages.size());
TransformationStage<SourceRecord> castXForm =
transformationStages.get(0);