This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 92d9dc19b22 KAFKA-19758: Preferably use the connector classloader when
loading pl… (#20675)
92d9dc19b22 is described below
commit 92d9dc19b22529895c430bf2551b551cbfbcaad1
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Oct 20 19:29:49 2025 +0200
KAFKA-19758: Preferably use the connector classloader when loading pl…
(#20675)
…ugins if it has the correct version
Reviewers: Greg Harris <[email protected]>, Fiore Mario Vitale
<[email protected]>, Snehashis Pal <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../kafka/connect/runtime/AbstractHerder.java | 2 +-
.../kafka/connect/runtime/ConnectorConfig.java | 8 +-
.../org/apache/kafka/connect/runtime/Worker.java | 2 +-
.../runtime/isolation/DelegatingClassLoader.java | 97 ++++++------
.../kafka/connect/runtime/isolation/Plugins.java | 23 +--
.../kafka/connect/runtime/AbstractHerderTest.java | 5 +-
.../apache/kafka/connect/runtime/WorkerTest.java | 14 +-
.../isolation/DelegatingClassLoaderTest.java | 164 +++++++++++++++++++--
.../runtime/isolation/MultiVersionTest.java | 5 +-
.../connect/runtime/isolation/PluginsTest.java | 9 +-
.../runtime/isolation/SynchronizationTest.java | 9 +-
.../runtime/standalone/StandaloneHerderTest.java | 9 +-
13 files changed, 237 insertions(+), 112 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index d363b9ed9c0..85b16fd65d4 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -175,7 +175,7 @@
<suppress checks="ClassFanOutComplexity"
files="(WorkerSink|WorkerSource|ErrorHandling)Task(|WithTopicCreation|Mockito)Test.java"/>
<suppress checks="ClassFanOutComplexity"
- files="DistributedHerderTest.java"/>
+ files="(AbstractHerderTest|DistributedHerderTest).java"/>
<suppress checks="MethodLength"
files="(RequestResponse|WorkerSinkTask|WorkerSinkTaskMockito)Test.java"/>
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 2e1e1d7318e..d5d6edf703e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -843,7 +843,7 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
try {
connVersion =
PluginUtils.connectorVersionRequirement(connectorProps.get(CONNECTOR_VERSION));
connector = cachedConnectors.getConnector(connType, connVersion);
- connectorLoader = plugins().pluginLoader(connType, connVersion);
+ connectorLoader = plugins().connectorLoader(connType, connVersion);
log.info("Validating connector {}, version {}", connType,
connector.version());
} catch (VersionedPluginLoadingException e) {
log.warn("Failed to load connector {} with version {}, skipping
additional validations (connector, converters, transformations, client
overrides) ",
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index ff4d399db1a..7a32884ee8c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -415,7 +415,9 @@ public class ConnectorConfig extends AbstractConfig {
try {
VersionRange range =
PluginUtils.connectorVersionRequirement(getString(versionConfig));
VersionRange connectorRange =
PluginUtils.connectorVersionRequirement(getString(CONNECTOR_VERSION));
- return (T) plugins.newPlugin(getClass(classConfig).getName(),
range, plugins.pluginLoader(getString(CONNECTOR_CLASS_CONFIG), connectorRange));
+ return (T) plugins.newPlugin(getClass(classConfig).getName(),
+ range,
+
plugins.connectorLoader(getString(CONNECTOR_CLASS_CONFIG), connectorRange));
} catch (Exception e) {
throw new ConnectException(e);
}
@@ -569,7 +571,7 @@ public class ConnectorConfig extends AbstractConfig {
}
try {
VersionRange range =
PluginUtils.connectorVersionRequirement(connectorVersion);
- return plugins.pluginVersion(pluginName,
plugins.pluginLoader(connectorClass, range), pluginType);
+ return plugins.pluginVersion(pluginName,
plugins.connectorLoader(connectorClass, range), pluginType);
} catch (InvalidVersionSpecificationException |
VersionedPluginLoadingException e) {
// these errors should be captured in other places, so we can
ignore them here
log.warn("Failed to determine default plugin version for {}",
connectorClass, e);
@@ -739,7 +741,7 @@ public class ConnectorConfig extends AbstractConfig {
T plugin;
try {
- plugin = (T) plugins.newPlugin(pluginClass, pluginVersion,
plugins.pluginLoader(connectorClass, connectorVersionRange));
+ plugin = (T) plugins.newPlugin(pluginClass, pluginVersion,
plugins.connectorLoader(connectorClass, connectorVersionRange));
} catch (VersionedPluginLoadingException e) {
throw e;
} catch (Exception e) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 53cc40d7fd8..076953268d6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -1248,7 +1248,7 @@ public final class Worker {
final String version =
connProps.get(ConnectorConfig.CONNECTOR_VERSION);
try {
- return plugins.pluginLoader(klass,
PluginUtils.connectorVersionRequirement(version));
+ return plugins.connectorLoader(klass,
PluginUtils.connectorVersionRequirement(version));
} catch (InvalidVersionSpecificationException |
VersionedPluginLoadingException e) {
throw new ConnectException(
String.format("Failed to get class loader for connector
%s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index f89b1f03a75..efb25f9062e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -23,9 +23,9 @@ import org.slf4j.LoggerFactory;
import java.net.URL;
import java.net.URLClassLoader;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -76,10 +76,11 @@ public class DelegatingClassLoader extends URLClassLoader {
* Retrieve the PluginClassLoader associated with a plugin class
*
* @param name The fully qualified class name of the plugin
+ * @param range The version range of the plugin
+ * @param connectorLoader The ClassLoader of the connector loading this
plugin
* @return the PluginClassLoader that should be used to load this, or null
if the plugin is not isolated.
*/
- // VisibleForTesting
- PluginClassLoader pluginClassLoader(String name, VersionRange range) {
+ PluginClassLoader pluginClassLoader(String name, VersionRange range,
Optional<ClassLoader> connectorLoader) {
if (!PluginUtils.shouldLoadInIsolation(name)) {
return null;
}
@@ -89,20 +90,15 @@ public class DelegatingClassLoader extends URLClassLoader {
return null;
}
-
- ClassLoader pluginLoader = findPluginLoader(inner, name, range);
+ ClassLoader pluginLoader = findPluginLoader(inner, name, range,
connectorLoader);
return pluginLoader instanceof PluginClassLoader
? (PluginClassLoader) pluginLoader
: null;
}
- PluginClassLoader pluginClassLoader(String name) {
- return pluginClassLoader(name, null);
- }
-
- ClassLoader loader(String classOrAlias, VersionRange range) {
+ ClassLoader connectorLoader(String classOrAlias, VersionRange range) {
String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
- ClassLoader classLoader = pluginClassLoader(fullName, range);
+ ClassLoader classLoader = pluginClassLoader(fullName, range,
Optional.empty());
if (classLoader == null) {
classLoader = this;
}
@@ -114,12 +110,22 @@ public class DelegatingClassLoader extends URLClassLoader
{
return classLoader;
}
- ClassLoader loader(String classOrAlias) {
- return loader(classOrAlias, null);
+ ClassLoader pluginLoader(String classOrAlias, VersionRange range,
ClassLoader connectorLoader) {
+ String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+ ClassLoader classLoader = pluginClassLoader(fullName, range,
Optional.ofNullable(connectorLoader));
+ if (classLoader == null) {
+ classLoader = this;
+ }
+ log.debug(
+ "Got plugin class loader: '{}' for plugin: {}",
+ classLoader,
+ classOrAlias
+ );
+ return classLoader;
}
ClassLoader connectorLoader(String connectorClassOrAlias) {
- return loader(connectorClassOrAlias);
+ return connectorLoader(connectorClassOrAlias, null);
}
String resolveFullClassName(String classOrAlias) {
@@ -151,42 +157,44 @@ public class DelegatingClassLoader extends URLClassLoader
{
private ClassLoader findPluginLoader(
SortedMap<PluginDesc<?>, ClassLoader> loaders,
String pluginName,
- VersionRange range
+ VersionRange range,
+ Optional<ClassLoader> connectorLoader
) {
- if (range != null) {
-
- if (null != range.getRecommendedVersion()) {
- throw new VersionedPluginLoadingException(String.format("A
soft version range is not supported for plugin loading, "
- + "this is an internal error as connect should
automatically convert soft ranges to hard ranges. "
- + "Provided soft version: %s ", range));
- }
+ if (range != null && range.getRecommendedVersion() != null) {
+ throw new VersionedPluginLoadingException(String.format("A soft
version range is not supported for plugin loading, "
+ + "this is an internal error as connect should
automatically convert soft ranges to hard ranges. "
+ + "Provided soft version: %s ", range));
+ }
- ClassLoader loader = null;
- for (Map.Entry<PluginDesc<?>, ClassLoader> entry :
loaders.entrySet()) {
- // the entries should be in sorted order of versions so this
should end up picking the latest version which matches the range
- if (range.containsVersion(entry.getKey().encodedVersion())) {
- loader = entry.getValue();
- }
+ ClassLoader loader = null;
+ // the entries should be in sorted order of versions so this should
end up picking the latest version which matches the range
+ for (Map.Entry<PluginDesc<?>, ClassLoader> entry : loaders.entrySet())
{
+ if (range == null ||
range.containsVersion(entry.getKey().encodedVersion())) {
+ loader = entry.getValue();
}
-
- if (loader == null) {
- List<String> availableVersions =
loaders.keySet().stream().map(PluginDesc::version).collect(Collectors.toList());
- throw new VersionedPluginLoadingException(String.format(
- "Plugin %s not found that matches the version range
%s, available versions: %s",
- pluginName,
- range,
- availableVersions
- ), availableVersions);
+ // if we find a plugin with the same loader as the connector, we
can end our search
+ if (connectorLoader.isPresent() &&
connectorLoader.get().equals(loader)) {
+ break;
}
- return loader;
}
- return loaders.get(loaders.lastKey());
+ if (range != null && loader == null) {
+ List<String> availableVersions =
loaders.keySet().stream().map(PluginDesc::version).collect(Collectors.toList());
+ throw new VersionedPluginLoadingException(String.format(
+ "Plugin %s not found that matches the version range %s,
available versions: %s",
+ pluginName,
+ range,
+ availableVersions
+ ), availableVersions);
+ }
+ return loader;
}
public void installDiscoveredPlugins(PluginScanResult scanResult) {
- pluginLoaders.putAll(computePluginLoaders(scanResult));
+ scanResult.forEach(pluginDesc ->
+ pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new
TreeMap<>())
+ .put(pluginDesc, pluginDesc.loader()));
for (String pluginClassName : pluginLoaders.keySet()) {
log.info("Added plugin '{}'", pluginClassName);
}
@@ -208,7 +216,7 @@ public class DelegatingClassLoader extends URLClassLoader {
) throws VersionedPluginLoadingException, ClassNotFoundException {
String fullName = aliases.getOrDefault(name, name);
- PluginClassLoader pluginLoader = pluginClassLoader(fullName, range);
+ PluginClassLoader pluginLoader = pluginClassLoader(fullName, range,
Optional.empty());
Class<?> plugin;
if (pluginLoader != null) {
log.trace("Retrieving loaded class '{}' from '{}'", name,
pluginLoader);
@@ -263,11 +271,4 @@ public class DelegatingClassLoader extends URLClassLoader {
}
}
- private static Map<String, SortedMap<PluginDesc<?>, ClassLoader>>
computePluginLoaders(PluginScanResult plugins) {
- Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders = new
HashMap<>();
- plugins.forEach(pluginDesc ->
- pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new
TreeMap<>())
- .put(pluginDesc, pluginDesc.loader()));
- return pluginLoaders;
- }
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index daf9f219992..cd0be18f71a 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -279,20 +279,14 @@ public class Plugins {
return delegatingLoader;
}
- // kept for compatibility
- public ClassLoader connectorLoader(String connectorClassOrAlias) {
- return delegatingLoader.loader(connectorClassOrAlias);
+ public ClassLoader connectorLoader(String connectorClassOrAlias,
VersionRange range) {
+ return delegatingLoader.connectorLoader(connectorClassOrAlias, range);
}
- public ClassLoader pluginLoader(String classOrAlias, VersionRange range) {
- return delegatingLoader.loader(classOrAlias, range);
+ public ClassLoader pluginLoader(String classOrAlias, VersionRange range,
ClassLoader connectorLoader) {
+ return delegatingLoader.pluginLoader(classOrAlias, range,
connectorLoader);
}
- public ClassLoader pluginLoader(String classOrAlias) {
- return delegatingLoader.loader(classOrAlias);
- }
-
-
@SuppressWarnings({"unchecked", "rawtypes"})
public Set<PluginDesc<Connector>> connectors() {
Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set)
sinkConnectors());
@@ -363,19 +357,14 @@ public class Plugins {
return plugins;
}
- public Object newPlugin(String classOrAlias) throws ClassNotFoundException
{
- Class<?> klass = pluginClass(delegatingLoader, classOrAlias,
Object.class);
- return newPlugin(klass);
- }
-
public Object newPlugin(String classOrAlias, VersionRange range) throws
VersionedPluginLoadingException, ClassNotFoundException {
Class<?> klass = pluginClass(delegatingLoader, classOrAlias,
Object.class, range);
return newPlugin(klass);
}
public Object newPlugin(String classOrAlias, VersionRange range,
ClassLoader sourceLoader) throws ClassNotFoundException {
- if (range == null && sourceLoader instanceof PluginClassLoader) {
- return newPlugin(sourceLoader.loadClass(classOrAlias));
+ if (sourceLoader instanceof PluginClassLoader) {
+ return newPlugin(pluginLoader(classOrAlias, range,
sourceLoader).loadClass(classOrAlias));
}
return newPlugin(classOrAlias, range);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 8c32f2d33be..956153db5c6 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -571,6 +571,7 @@ public class AbstractHerderTest {
// 2 transform aliases defined -> 2 plugin lookups
Mockito.lenient().when(plugins.transformations()).thenReturn(Set.of(transformationPluginDesc()));
+ Mockito.lenient().when(plugins.connectorLoader(any(),
any())).thenReturn(classLoader);
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(),
null, classLoader)).thenReturn(new SampleTransformation());
// Define 2 transformations. One has a class defined and so can get
embedded configs, the other is missing
@@ -626,6 +627,8 @@ public class AbstractHerderTest {
Mockito.lenient().when(plugins.transformations()).thenReturn(Set.of(transformationPluginDesc()));
Mockito.lenient().when(plugins.predicates()).thenReturn(Set.of(predicatePluginDesc()));
+ Mockito.lenient().when(plugins.connectorLoader(any(),
any())).thenReturn(classLoader);
+
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(),
null, classLoader)).thenReturn(new SampleTransformation());
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(),
null, classLoader)).thenReturn(new SampleTransformation());
Mockito.lenient().when(plugins.newPlugin(SamplePredicate.class.getName(), null,
classLoader)).thenReturn(new SamplePredicate());
@@ -1341,7 +1344,7 @@ public class AbstractHerderTest {
when(workerConfig.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG)).thenReturn((Class)
SimpleHeaderConverter.class);
when(worker.config()).thenReturn(workerConfig);
when(plugins.newConnector(anyString(), any())).thenReturn(connector);
- when(plugins.pluginLoader(connectorClass,
null)).thenReturn(classLoader);
+ when(plugins.connectorLoader(any(), any())).thenReturn(classLoader);
when(plugins.withClassLoader(classLoader)).thenReturn(loaderSwap);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index e29eeebe88d..b4e83b76822 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -391,7 +391,6 @@ public class WorkerTest {
mockKafkaClusterId();
mockGenericIsolation();
- when(plugins.pluginLoader(nonConnectorClass,
null)).thenReturn(pluginLoader);
when(plugins.newConnector(nonConnectorClass,
null)).thenThrow(exception);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@@ -609,7 +608,6 @@ public class WorkerTest {
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
-
Map<String, String> origProps = Map.of(TaskConfig.TASK_CLASS_CONFIG,
TestSourceTask.class.getName());
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, executorService,
@@ -909,7 +907,6 @@ public class WorkerTest {
mockKafkaClusterId();
mockGenericIsolation();
- when(plugins.pluginLoader(SampleSourceConnector.class.getName(),
null)).thenReturn(pluginLoader);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
worker.herder = herder;
@@ -1894,7 +1891,6 @@ public class WorkerTest {
mockKafkaClusterId();
mockGenericIsolation();
when(plugins.connectorClass(anyString(), any())).thenReturn((Class)
sourceConnector.getClass());
- when(plugins.pluginLoader(SampleSourceConnector.class.getName(),
null)).thenReturn(pluginLoader);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, executorService,
allConnectorClientConfigOverridePolicy, mockAdminConstructor);
@@ -2105,8 +2101,7 @@ public class WorkerTest {
mockGenericIsolation();
when(plugins.newConnector(anyString(),
any())).thenReturn(sourceConnector);
- when(plugins.pluginLoader(SampleSourceConnector.class.getName(),
null)).thenReturn(pluginLoader);
- when(plugins.withClassLoader(any(ClassLoader.class),
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+ when(plugins.withClassLoader(any(),
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
when(sourceConnector.alterOffsets(eq(connectorProps),
anyMap())).thenThrow(new UnsupportedOperationException("This connector doesn't
" +
"support altering of offsets"));
@@ -2815,6 +2810,7 @@ public class WorkerTest {
private void testStartTaskWithTooManyTaskConfigs(boolean enforced) {
SinkTask task = mock(TestSinkTask.class);
mockKafkaClusterId();
+ when(plugins.connectorLoader(any(), any())).thenReturn(pluginLoader);
Map<String, String> origProps = Map.of(TaskConfig.TASK_CLASS_CONFIG,
TestSinkTask.class.getName());
@@ -3026,11 +3022,11 @@ public class WorkerTest {
}
private void mockGenericIsolation() {
+ when(plugins.connectorLoader(any(), any())).thenReturn(pluginLoader);
when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
}
private void verifyGenericIsolation() {
- verify(plugins, atLeastOnce()).withClassLoader(pluginLoader);
verify(loaderSwap, atLeastOnce()).close();
}
@@ -3042,7 +3038,6 @@ public class WorkerTest {
private void mockVersionedConnectorIsolation(String connectorClass,
VersionRange range, Connector connector) {
mockGenericIsolation();
- when(plugins.pluginLoader(connectorClass,
range)).thenReturn(pluginLoader);
when(plugins.newConnector(connectorClass,
range)).thenReturn(connector);
when(connector.version()).thenReturn(range == null ? "unknown" :
range.toString());
}
@@ -3055,7 +3050,6 @@ public class WorkerTest {
private void verifyVersionedConnectorIsolation(String connectorClass,
VersionRange range, Connector connector) {
verifyGenericIsolation();
- verify(plugins).pluginLoader(connectorClass, range);
verify(plugins).newConnector(connectorClass, range);
verify(connector, atLeastOnce()).version();
}
@@ -3070,7 +3064,6 @@ public class WorkerTest {
@SuppressWarnings({"unchecked", "rawtypes"})
private void mockVersionedTaskIsolation(Class<? extends Connector>
connectorClass, Class<? extends Task> taskClass, VersionRange range, Connector
connector, Task task) {
mockGenericIsolation();
- when(plugins.pluginLoader(connectorClass.getName(),
range)).thenReturn(pluginLoader);
when(plugins.connectorClass(connectorClass.getName(),
range)).thenReturn((Class) connectorClass);
when(plugins.newTask(taskClass)).thenReturn(task);
when(plugins.safeLoaderSwapper()).thenReturn(TestPlugins.noOpLoaderSwap());
@@ -3086,7 +3079,6 @@ public class WorkerTest {
private void verifyVersionedTaskIsolation(Class<? extends Connector>
connectorClass, Class<? extends Task> taskClass, VersionRange range, Task task)
{
verifyGenericIsolation();
- verify(plugins).pluginLoader(connectorClass.getName(), range);
verify(plugins).connectorClass(connectorClass.getName(), range);
verify(plugins).newTask(taskClass);
verify(task, times(2)).version();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
index fd97935933a..76bad03fb01 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
@@ -17,10 +17,16 @@
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.transforms.Cast;
+import org.apache.kafka.connect.transforms.Filter;
+import org.apache.kafka.connect.transforms.Transformation;
+import
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
@@ -31,6 +37,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -41,13 +48,30 @@ public class DelegatingClassLoaderTest {
public PluginClassLoader parent;
public PluginClassLoader pluginLoader;
+ public PluginClassLoader pluginLoader2;
+ public PluginClassLoader pluginLoader3;
+ public PluginClassLoader pluginLoader4;
public DelegatingClassLoader classLoader;
- public PluginDesc<SinkConnector> pluginDesc;
+ public PluginDesc<SinkConnector> connectorPluginDesc;
+ public PluginDesc<SinkConnector> connectorPluginDesc2;
+ public PluginDesc<Transformation<?>> cast;
+ public PluginDesc<Transformation<?>> castV1Loader2;
+ public PluginDesc<Transformation<?>> castV1Loader3;
+ public PluginDesc<Transformation<?>> castV2;
+ public PluginDesc<Transformation<?>> filter;
public PluginScanResult scanResult;
+ public String version1 = "1.0";
+ public String version2 = "2.0";
+ public VersionRange range1;
+ public VersionRange range1And2;
+ public VersionRange range2;
+ public VersionRange range123;
// Arbitrary values, their contents is not meaningful.
public static final String ARBITRARY = "arbitrary";
- public static final Class<?> ARBITRARY_CLASS = org.mockito.Mockito.class;
+ public static final Class<?> CONN = Mockito.class;
+ public static final Class<?> CAST = mock(Cast.class).getClass();
+ public static final Class<?> FILTER = mock(Filter.class).getClass();
public static final URL ARBITRARY_URL;
static {
@@ -60,23 +84,47 @@ public class DelegatingClassLoaderTest {
@BeforeEach
@SuppressWarnings({"unchecked"})
- public void setUp() {
+ public void setUp() throws InvalidVersionSpecificationException {
+ range1 = VersionRange.createFromVersionSpec("[" + version1 + "]");
+ range1And2 = VersionRange.createFromVersionSpec("[" + version1 + "," +
version2 + "]");
+ range2 = VersionRange.createFromVersionSpec("[" + version2 + "]");
+ range123 = VersionRange.createFromVersionSpec("[123]");
parent = mock(PluginClassLoader.class);
pluginLoader = mock(PluginClassLoader.class);
+ pluginLoader2 = mock(PluginClassLoader.class);
+ pluginLoader3 = mock(PluginClassLoader.class);
+ pluginLoader4 = mock(PluginClassLoader.class);
classLoader = new DelegatingClassLoader(parent);
SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
// Lie to the DCL that this arbitrary class is a connector, since all
real connector classes we have access to
// are forced to be non-isolated by PluginUtils.shouldLoadInIsolation.
when(pluginLoader.location()).thenReturn("some-location");
- pluginDesc = new PluginDesc<>((Class<? extends SinkConnector>)
ARBITRARY_CLASS, null, PluginType.SINK, pluginLoader);
- assertTrue(PluginUtils.shouldLoadInIsolation(pluginDesc.className()));
- sinkConnectors.add(pluginDesc);
+ when(pluginLoader2.location()).thenReturn("some-location2");
+ when(pluginLoader3.location()).thenReturn("some-location3");
+ when(pluginLoader4.location()).thenReturn("some-location4");
+ connectorPluginDesc = new PluginDesc<>((Class<? extends
SinkConnector>) CONN, null, PluginType.SINK, pluginLoader);
+ connectorPluginDesc2 = new PluginDesc<>((Class<? extends
SinkConnector>) CONN, version1, PluginType.SINK, pluginLoader2);
+
assertTrue(PluginUtils.shouldLoadInIsolation(connectorPluginDesc.className()));
+
assertTrue(PluginUtils.shouldLoadInIsolation(connectorPluginDesc2.className()));
+ sinkConnectors.add(connectorPluginDesc);
+ sinkConnectors.add(connectorPluginDesc2);
+ SortedSet<PluginDesc<Transformation<?>>> transformations = new
TreeSet<>();
+ cast = new PluginDesc<>((Class<? extends Transformation<?>>) CAST,
null, PluginType.TRANSFORMATION, pluginLoader);
+ castV1Loader2 = new PluginDesc<>((Class<? extends Transformation<?>>)
CAST, version1, PluginType.TRANSFORMATION, pluginLoader2);
+ castV1Loader3 = new PluginDesc<>((Class<? extends Transformation<?>>)
CAST, version1, PluginType.TRANSFORMATION, pluginLoader3);
+ castV2 = new PluginDesc<>((Class<? extends Transformation<?>>) CAST,
version2, PluginType.TRANSFORMATION, pluginLoader4);
+ filter = new PluginDesc<>((Class<? extends Transformation<?>>) FILTER,
null, PluginType.TRANSFORMATION, pluginLoader4);
+ transformations.add(cast);
+ transformations.add(castV1Loader2);
+ transformations.add(castV1Loader3);
+ transformations.add(castV2);
+ transformations.add(filter);
scanResult = new PluginScanResult(
sinkConnectors,
new TreeSet<>(),
new TreeSet<>(),
new TreeSet<>(),
- new TreeSet<>(),
+ transformations,
new TreeSet<>(),
new TreeSet<>(),
new TreeSet<>(),
@@ -92,8 +140,8 @@ public class DelegatingClassLoaderTest {
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void testEmptyLoadClass() throws ClassNotFoundException {
- when(parent.loadClass(ARBITRARY, false)).thenReturn((Class)
ARBITRARY_CLASS);
- assertSame(ARBITRARY_CLASS, classLoader.loadClass(ARBITRARY, false));
+ when(parent.loadClass(ARBITRARY, false)).thenReturn((Class) CAST);
+ assertSame(CAST, classLoader.loadClass(ARBITRARY, false));
}
@Test
@@ -105,17 +153,103 @@ public class DelegatingClassLoaderTest {
@Test
public void testInitializedConnectorLoader() {
classLoader.installDiscoveredPlugins(scanResult);
- assertSame(pluginLoader,
classLoader.connectorLoader(PluginUtils.prunedName(pluginDesc)));
- assertSame(pluginLoader,
classLoader.connectorLoader(PluginUtils.simpleName(pluginDesc)));
- assertSame(pluginLoader,
classLoader.connectorLoader(pluginDesc.className()));
+ ClassLoader expectedLoader =
scanResult.sinkConnectors().last().loader();
+ assertSame(expectedLoader,
classLoader.connectorLoader(connectorPluginDesc.className()));
+ }
+
+ @Test
+ public void testInitializedConnectorLoaderWithVersion() {
+ classLoader.installDiscoveredPlugins(scanResult);
+ // connector v1 is only in pluginLoader2
+ assertSame(pluginLoader2,
classLoader.connectorLoader(connectorPluginDesc.className(), range1));
+
+ // connector v123 cannot be found
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.connectorLoader(cast.className(), range123));
+ }
+
+ @Test
+ public void testInitializedPluginLoader() {
+ classLoader.installDiscoveredPlugins(scanResult);
+ // without a loader or version, the last loader that has the plugin is
picked
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), null, null));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), null, null));
+ }
+
+ @Test
+ public void testInitializedPluginLoaderWithClassLoader() {
+ classLoader.installDiscoveredPlugins(scanResult);
+ // when range is not provided return our classloader if it has the
plugin
+ assertSame(pluginLoader,
classLoader.pluginLoader(PluginUtils.prunedName(cast), null, pluginLoader));
+ assertSame(pluginLoader,
classLoader.pluginLoader(PluginUtils.simpleName(cast), null, pluginLoader));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.prunedName(cast), null, pluginLoader3));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.simpleName(cast), null, pluginLoader3));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(filter), null, pluginLoader4));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(filter), null, pluginLoader4));
+
+ // when range is not provided return the classloader which has the
plugin if it's no in our classloader
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(filter), null, pluginLoader));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(filter), null, pluginLoader));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(filter), null, pluginLoader3));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(filter), null, pluginLoader3));
+
+ // when range is not provided return the last classloader which has
the plugin if it's no in our classloader
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), null, pluginLoader4));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), null, pluginLoader4));
+ }
+
+ @Test
+ public void testInitializedPluginLoaderWithVersion() {
+ classLoader.installDiscoveredPlugins(scanResult);
+ // cast v1 is in both pluginLoader2 and pluginLoader3, we prefer the
specified loader
+ assertSame(pluginLoader2,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1, pluginLoader2));
+ assertSame(pluginLoader2,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1, pluginLoader2));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1, pluginLoader3));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1, pluginLoader3));
+
+ // cast v1 is in both pluginLoader2 and pluginLoader3, we prefer the
last loader
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1, pluginLoader));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1, pluginLoader));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1, pluginLoader4));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1, pluginLoader4));
+
+ // both cast v1 and v2 match the range, we prefer the specified loader
+ assertSame(pluginLoader2,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1And2,
pluginLoader2));
+ assertSame(pluginLoader2,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1And2,
pluginLoader2));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1And2,
pluginLoader3));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1And2,
pluginLoader3));
+
+ // both cast v1 and v2 match the range, we prefer the last loader
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1And2,
pluginLoader));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1And2,
pluginLoader));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1And2,
pluginLoader4));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1And2,
pluginLoader4));
+
+ // cast v2 is only in pluginLoader4
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range2, pluginLoader));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range2, pluginLoader));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range2, pluginLoader3));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range2, pluginLoader3));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range2, pluginLoader4));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range2, pluginLoader4));
+
+ // cast v123 cannot be found
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.prunedName(cast), range123, pluginLoader));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.simpleName(cast), range123, pluginLoader));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.prunedName(cast), range123,
pluginLoader2));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.simpleName(cast), range123,
pluginLoader2));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.prunedName(cast), range123,
pluginLoader3));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.simpleName(cast), range123,
pluginLoader3));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.prunedName(cast), range123,
pluginLoader4));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.simpleName(cast), range123,
pluginLoader4));
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void testInitializedLoadClass() throws ClassNotFoundException {
classLoader.installDiscoveredPlugins(scanResult);
- String className = pluginDesc.className();
- when(pluginLoader.loadClass(className, false)).thenReturn((Class)
ARBITRARY_CLASS);
- assertSame(ARBITRARY_CLASS, classLoader.loadClass(className, false));
+ String className = connectorPluginDesc.className();
+ // Use the last loader that has CONN
+ when(pluginLoader2.loadClass(className, false)).thenReturn((Class)
CONN);
+ assertSame(CONN, classLoader.loadClass(className, false));
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
index c4a60df57e1..f2606c5846e 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
@@ -57,7 +57,7 @@ public class MultiVersionTest {
String pluginLocation = entry.getKey().toAbsolutePath().toString();
for (VersionedPluginBuilder.BuildInfo buildInfo :
entry.getValue()) {
- ClassLoader pluginLoader =
plugins.pluginLoader(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+ ClassLoader pluginLoader =
plugins.pluginLoader(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()), null);
Assertions.assertInstanceOf(PluginClassLoader.class,
pluginLoader);
Assertions.assertTrue(((PluginClassLoader)
pluginLoader).location().contains(pluginLocation));
Object p = plugins.newPlugin(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
@@ -166,7 +166,8 @@ public class MultiVersionTest {
// get the connector loader of the combined artifact which includes
all plugin types
ClassLoader connectorLoader = plugins.pluginLoader(
VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className(),
- PluginUtils.connectorVersionRequirement("0.1.0")
+ PluginUtils.connectorVersionRequirement("0.1.0"),
+ null
);
Assertions.assertInstanceOf(PluginClassLoader.class, connectorLoader);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 9492f9f7ea2..4b144288a54 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -59,6 +59,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -355,7 +356,7 @@ public class PluginsTest {
@Test
public void newConverterShouldConfigureWithPluginClassLoader() {
props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
TestPlugin.SAMPLING_CONVERTER.className());
- ClassLoader classLoader =
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONVERTER.className());
+ ClassLoader classLoader =
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONVERTER.className(),
null, Optional.empty());
try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
createConfig();
}
@@ -377,7 +378,7 @@ public class PluginsTest {
String providerPrefix = "some.provider";
props.put(providerPrefix + ".class",
TestPlugin.SAMPLING_CONFIG_PROVIDER.className());
- PluginClassLoader classLoader =
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONFIG_PROVIDER.className());
+ PluginClassLoader classLoader =
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONFIG_PROVIDER.className(),
null, Optional.empty());
assertNotNull(classLoader);
try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
createConfig();
@@ -398,7 +399,7 @@ public class PluginsTest {
@Test
public void newHeaderConverterShouldConfigureWithPluginClassLoader() {
props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
TestPlugin.SAMPLING_HEADER_CONVERTER.className());
- ClassLoader classLoader =
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_HEADER_CONVERTER.className());
+ ClassLoader classLoader =
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_HEADER_CONVERTER.className(),
null, Optional.empty());
try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
createConfig();
}
@@ -590,7 +591,7 @@ public class PluginsTest {
@Test
public void testAliasesInConverters() throws ClassNotFoundException {
- ClassLoader connectorLoader =
plugins.connectorLoader(TestPlugin.SAMPLING_CONNECTOR.className());
+ ClassLoader connectorLoader =
plugins.connectorLoader(TestPlugin.SAMPLING_CONNECTOR.className(), null);
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader))
{
String configKey = "config.key";
String alias = "SamplingConverter";
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index bc326c02ee3..5748ef128f5 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -45,6 +45,7 @@ import java.net.URL;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingDeque;
@@ -190,10 +191,10 @@ public class SynchronizationTest {
}
@Override
- public PluginClassLoader pluginClassLoader(String name, VersionRange
range) {
+ PluginClassLoader pluginClassLoader(String name, VersionRange range,
Optional<ClassLoader> connectorLoader) {
dclBreakpoint.await(name);
dclBreakpoint.await(name);
- return super.pluginClassLoader(name, range);
+ return super.pluginClassLoader(name, range, connectorLoader);
}
}
@@ -224,7 +225,7 @@ public class SynchronizationTest {
public void testSimultaneousUpwardAndDownwardDelegating() throws Exception
{
String t1Class = TestPlugins.TestPlugin.SAMPLING_CONVERTER.className();
// Grab a reference to the target PluginClassLoader before activating
breakpoints
- ClassLoader connectorLoader = plugins.connectorLoader(t1Class);
+ ClassLoader connectorLoader = plugins.connectorLoader(t1Class, null);
// THREAD 1: loads a class by delegating downward starting from the
DelegatingClassLoader
// DelegatingClassLoader breakpoint will only trigger on this thread
@@ -304,7 +305,7 @@ public class SynchronizationTest {
public void testPluginClassLoaderDoesntHoldMonitorLock()
throws InterruptedException, TimeoutException, BrokenBarrierException {
String t1Class = TestPlugins.TestPlugin.SAMPLING_CONVERTER.className();
- ClassLoader connectorLoader = plugins.connectorLoader(t1Class);
+ ClassLoader connectorLoader = plugins.connectorLoader(t1Class, null);
Object externalTestLock = new Object();
Breakpoint<Object> testBreakpoint = new Breakpoint<>();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index dc6325bfa36..13331692047 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -193,8 +193,7 @@ public class StandaloneHerderTest {
when(worker.config()).thenReturn(workerConfig);
when(workerConfig.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG)).thenReturn((Class)
SimpleHeaderConverter.class);
when(plugins.newConnector(anyString(),
any())).thenReturn(connectorMock);
- when(plugins.pluginLoader(anyString(),
any())).thenReturn(pluginLoader);
- when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
+ when(plugins.withClassLoader(null)).thenReturn(loaderSwap);
when(connectorMock.config()).thenReturn(new ConfigDef());
ConfigValue validatedValue = new ConfigValue("foo.bar");
@@ -885,7 +884,8 @@ public class StandaloneHerderTest {
when(worker.config()).thenReturn(workerConfig);
when(worker.getPlugins()).thenReturn(plugins);
when(workerConfig.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG)).thenReturn((Class)
SimpleHeaderConverter.class);
- when(plugins.pluginLoader(anyString(),
any())).thenReturn(pluginLoader);
+ //when(plugins.pluginLoader(anyString(), any(),
any())).thenReturn(pluginLoader);
+ when(plugins.connectorLoader(any(), any())).thenReturn(pluginLoader);
when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
when(plugins.newConnector(anyString(),
any())).thenReturn(connectorMock);
when(connectorMock.config()).thenReturn(configDef);
@@ -1240,7 +1240,8 @@ public class StandaloneHerderTest {
when(transformer.transform(configCapture.capture())).thenAnswer(invocation ->
configCapture.getValue());
when(worker.config()).thenReturn(workerConfig);
when(workerConfig.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG)).thenReturn((Class)
SimpleHeaderConverter.class);
- when(plugins.pluginLoader(anyString(),
any())).thenReturn(pluginLoader);
+ //when(plugins.pluginLoader(anyString(), any(),
any())).thenReturn(pluginLoader);
+ when(plugins.connectorLoader(any(), any())).thenReturn(pluginLoader);
when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
// Assume the connector should always be created
when(worker.getPlugins()).thenReturn(plugins);