This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new f0c02d94d4c KAFKA-19758: Preferably use the connector classloader when
loading pl… (#20675)
f0c02d94d4c is described below
commit f0c02d94d4c0eed1f10b484cf19d1a31479dc17f
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Oct 20 19:29:49 2025 +0200
KAFKA-19758: Preferably use the connector classloader when loading pl…
(#20675)
…ugins if it has the correct version
Reviewers: Greg Harris <[email protected]>, Fiore Mario Vitale
<[email protected]>, Snehashis Pal <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../kafka/connect/runtime/AbstractHerder.java | 2 +-
.../kafka/connect/runtime/ConnectorConfig.java | 8 +-
.../org/apache/kafka/connect/runtime/Worker.java | 2 +-
.../runtime/isolation/DelegatingClassLoader.java | 100 ++++++------
.../kafka/connect/runtime/isolation/Plugins.java | 23 +--
.../kafka/connect/runtime/AbstractHerderTest.java | 11 +-
.../apache/kafka/connect/runtime/WorkerTest.java | 15 +-
.../isolation/DelegatingClassLoaderTest.java | 181 ++++++++++++++++++---
.../runtime/isolation/MultiVersionTest.java | 5 +-
.../connect/runtime/isolation/PluginsTest.java | 9 +-
.../runtime/isolation/SynchronizationTest.java | 9 +-
.../runtime/standalone/StandaloneHerderTest.java | 9 +-
13 files changed, 250 insertions(+), 126 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7c02cf3adf9..6d5a60829c7 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -174,7 +174,7 @@
<suppress checks="ClassFanOutComplexity"
files="(WorkerSink|WorkerSource|ErrorHandling)Task(|WithTopicCreation|Mockito)Test.java"/>
<suppress checks="ClassFanOutComplexity"
- files="DistributedHerderTest.java"/>
+ files="(AbstractHerderTest|DistributedHerderTest).java"/>
<suppress checks="MethodLength"
files="(RequestResponse|WorkerSinkTask|WorkerSinkTaskMockito)Test.java"/>
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index bce67129388..75083a755f7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -843,7 +843,7 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
try {
connVersion =
PluginUtils.connectorVersionRequirement(connectorProps.get(CONNECTOR_VERSION));
connector = cachedConnectors.getConnector(connType, connVersion);
- connectorLoader = plugins().pluginLoader(connType, connVersion);
+ connectorLoader = plugins().connectorLoader(connType, connVersion);
log.info("Validating connector {}, version {}", connType,
connector.version());
} catch (VersionedPluginLoadingException e) {
log.warn("Failed to load connector {} with version {}, skipping
additional validations (connector, converters, transformations, client
overrides) ",
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index efd421bd2e2..9d44cab9b7e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -416,7 +416,9 @@ public class ConnectorConfig extends AbstractConfig {
try {
VersionRange range =
PluginUtils.connectorVersionRequirement(getString(versionConfig));
VersionRange connectorRange =
PluginUtils.connectorVersionRequirement(getString(CONNECTOR_VERSION));
- return (T) plugins.newPlugin(getClass(classConfig).getName(),
range, plugins.pluginLoader(getString(CONNECTOR_CLASS_CONFIG), connectorRange));
+ return (T) plugins.newPlugin(getClass(classConfig).getName(),
+ range,
+
plugins.connectorLoader(getString(CONNECTOR_CLASS_CONFIG), connectorRange));
} catch (Exception e) {
throw new ConnectException(e);
}
@@ -570,7 +572,7 @@ public class ConnectorConfig extends AbstractConfig {
}
try {
VersionRange range =
PluginUtils.connectorVersionRequirement(connectorVersion);
- return plugins.pluginVersion(pluginName,
plugins.pluginLoader(connectorClass, range), pluginType);
+ return plugins.pluginVersion(pluginName,
plugins.connectorLoader(connectorClass, range), pluginType);
} catch (InvalidVersionSpecificationException |
VersionedPluginLoadingException e) {
// these errors should be captured in other places, so we can
ignore them here
log.warn("Failed to determine default plugin version for {}",
connectorClass, e);
@@ -740,7 +742,7 @@ public class ConnectorConfig extends AbstractConfig {
T plugin;
try {
- plugin = (T) plugins.newPlugin(pluginClass, pluginVersion,
plugins.pluginLoader(connectorClass, connectorVersionRange));
+ plugin = (T) plugins.newPlugin(pluginClass, pluginVersion,
plugins.connectorLoader(connectorClass, connectorVersionRange));
} catch (VersionedPluginLoadingException e) {
throw e;
} catch (Exception e) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index a3e914d3f90..2021d63f320 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -1249,7 +1249,7 @@ public final class Worker {
final String version =
connProps.get(ConnectorConfig.CONNECTOR_VERSION);
try {
- return plugins.pluginLoader(klass,
PluginUtils.connectorVersionRequirement(version));
+ return plugins.connectorLoader(klass,
PluginUtils.connectorVersionRequirement(version));
} catch (InvalidVersionSpecificationException |
VersionedPluginLoadingException e) {
throw new ConnectException(
String.format("Failed to get class loader for connector
%s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 9f3cd021750..efb25f9062e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -23,10 +23,9 @@ import org.slf4j.LoggerFactory;
import java.net.URL;
import java.net.URLClassLoader;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -77,10 +76,11 @@ public class DelegatingClassLoader extends URLClassLoader {
* Retrieve the PluginClassLoader associated with a plugin class
*
* @param name The fully qualified class name of the plugin
+ * @param range The version range of the plugin
+ * @param connectorLoader The ClassLoader of the connector loading this
plugin
* @return the PluginClassLoader that should be used to load this, or null
if the plugin is not isolated.
*/
- // VisibleForTesting
- PluginClassLoader pluginClassLoader(String name, VersionRange range) {
+ PluginClassLoader pluginClassLoader(String name, VersionRange range,
Optional<ClassLoader> connectorLoader) {
if (!PluginUtils.shouldLoadInIsolation(name)) {
return null;
}
@@ -90,20 +90,15 @@ public class DelegatingClassLoader extends URLClassLoader {
return null;
}
-
- ClassLoader pluginLoader = findPluginLoader(inner, name, range);
+ ClassLoader pluginLoader = findPluginLoader(inner, name, range,
connectorLoader);
return pluginLoader instanceof PluginClassLoader
? (PluginClassLoader) pluginLoader
: null;
}
- PluginClassLoader pluginClassLoader(String name) {
- return pluginClassLoader(name, null);
- }
-
- ClassLoader loader(String classOrAlias, VersionRange range) {
+ ClassLoader connectorLoader(String classOrAlias, VersionRange range) {
String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
- ClassLoader classLoader = pluginClassLoader(fullName, range);
+ ClassLoader classLoader = pluginClassLoader(fullName, range,
Optional.empty());
if (classLoader == null) {
classLoader = this;
}
@@ -115,12 +110,22 @@ public class DelegatingClassLoader extends URLClassLoader
{
return classLoader;
}
- ClassLoader loader(String classOrAlias) {
- return loader(classOrAlias, null);
+ ClassLoader pluginLoader(String classOrAlias, VersionRange range,
ClassLoader connectorLoader) {
+ String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+ ClassLoader classLoader = pluginClassLoader(fullName, range,
Optional.ofNullable(connectorLoader));
+ if (classLoader == null) {
+ classLoader = this;
+ }
+ log.debug(
+ "Got plugin class loader: '{}' for plugin: {}",
+ classLoader,
+ classOrAlias
+ );
+ return classLoader;
}
ClassLoader connectorLoader(String connectorClassOrAlias) {
- return loader(connectorClassOrAlias);
+ return connectorLoader(connectorClassOrAlias, null);
}
String resolveFullClassName(String classOrAlias) {
@@ -152,42 +157,44 @@ public class DelegatingClassLoader extends URLClassLoader
{
private ClassLoader findPluginLoader(
SortedMap<PluginDesc<?>, ClassLoader> loaders,
String pluginName,
- VersionRange range
+ VersionRange range,
+ Optional<ClassLoader> connectorLoader
) {
- if (range != null) {
-
- if (null != range.getRecommendedVersion()) {
- throw new VersionedPluginLoadingException(String.format("A
soft version range is not supported for plugin loading, "
- + "this is an internal error as connect should
automatically convert soft ranges to hard ranges. "
- + "Provided soft version: %s ", range));
- }
+ if (range != null && range.getRecommendedVersion() != null) {
+ throw new VersionedPluginLoadingException(String.format("A soft
version range is not supported for plugin loading, "
+ + "this is an internal error as connect should
automatically convert soft ranges to hard ranges. "
+ + "Provided soft version: %s ", range));
+ }
- ClassLoader loader = null;
- for (Map.Entry<PluginDesc<?>, ClassLoader> entry :
loaders.entrySet()) {
- // the entries should be in sorted order of versions so this
should end up picking the latest version which matches the range
- if (range.containsVersion(entry.getKey().encodedVersion())) {
- loader = entry.getValue();
- }
+ ClassLoader loader = null;
+ // the entries should be in sorted order of versions so this should
end up picking the latest version which matches the range
+ for (Map.Entry<PluginDesc<?>, ClassLoader> entry : loaders.entrySet())
{
+ if (range == null ||
range.containsVersion(entry.getKey().encodedVersion())) {
+ loader = entry.getValue();
}
-
- if (loader == null) {
- List<String> availableVersions =
loaders.keySet().stream().map(PluginDesc::version).collect(Collectors.toList());
- throw new VersionedPluginLoadingException(String.format(
- "Plugin %s not found that matches the version range
%s, available versions: %s",
- pluginName,
- range,
- availableVersions
- ), availableVersions);
+ // if we find a plugin with the same loader as the connector, we
can end our search
+ if (connectorLoader.isPresent() &&
connectorLoader.get().equals(loader)) {
+ break;
}
- return loader;
}
- return loaders.get(loaders.lastKey());
+ if (range != null && loader == null) {
+ List<String> availableVersions =
loaders.keySet().stream().map(PluginDesc::version).collect(Collectors.toList());
+ throw new VersionedPluginLoadingException(String.format(
+ "Plugin %s not found that matches the version range %s,
available versions: %s",
+ pluginName,
+ range,
+ availableVersions
+ ), availableVersions);
+ }
+ return loader;
}
public void installDiscoveredPlugins(PluginScanResult scanResult) {
- pluginLoaders.putAll(computePluginLoaders(scanResult));
+ scanResult.forEach(pluginDesc ->
+ pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new
TreeMap<>())
+ .put(pluginDesc, pluginDesc.loader()));
for (String pluginClassName : pluginLoaders.keySet()) {
log.info("Added plugin '{}'", pluginClassName);
}
@@ -209,7 +216,7 @@ public class DelegatingClassLoader extends URLClassLoader {
) throws VersionedPluginLoadingException, ClassNotFoundException {
String fullName = aliases.getOrDefault(name, name);
- PluginClassLoader pluginLoader = pluginClassLoader(fullName, range);
+ PluginClassLoader pluginLoader = pluginClassLoader(fullName, range,
Optional.empty());
Class<?> plugin;
if (pluginLoader != null) {
log.trace("Retrieving loaded class '{}' from '{}'", name,
pluginLoader);
@@ -259,16 +266,9 @@ public class DelegatingClassLoader extends URLClassLoader {
fullName,
pluginVersion,
range
- ), Collections.singletonList(pluginVersion));
+ ), List.of(pluginVersion));
}
}
}
- private static Map<String, SortedMap<PluginDesc<?>, ClassLoader>>
computePluginLoaders(PluginScanResult plugins) {
- Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders = new
HashMap<>();
- plugins.forEach(pluginDesc ->
- pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new
TreeMap<>())
- .put(pluginDesc, pluginDesc.loader()));
- return pluginLoaders;
- }
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 97094bc89c8..f790ef59da2 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -282,20 +282,14 @@ public class Plugins {
return delegatingLoader;
}
- // kept for compatibility
- public ClassLoader connectorLoader(String connectorClassOrAlias) {
- return delegatingLoader.loader(connectorClassOrAlias);
+ public ClassLoader connectorLoader(String connectorClassOrAlias,
VersionRange range) {
+ return delegatingLoader.connectorLoader(connectorClassOrAlias, range);
}
- public ClassLoader pluginLoader(String classOrAlias, VersionRange range) {
- return delegatingLoader.loader(classOrAlias, range);
+ public ClassLoader pluginLoader(String classOrAlias, VersionRange range,
ClassLoader connectorLoader) {
+ return delegatingLoader.pluginLoader(classOrAlias, range,
connectorLoader);
}
- public ClassLoader pluginLoader(String classOrAlias) {
- return delegatingLoader.loader(classOrAlias);
- }
-
-
@SuppressWarnings({"unchecked", "rawtypes"})
public Set<PluginDesc<Connector>> connectors() {
Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set)
sinkConnectors());
@@ -366,19 +360,14 @@ public class Plugins {
return plugins;
}
- public Object newPlugin(String classOrAlias) throws ClassNotFoundException
{
- Class<?> klass = pluginClass(delegatingLoader, classOrAlias,
Object.class);
- return newPlugin(klass);
- }
-
public Object newPlugin(String classOrAlias, VersionRange range) throws
VersionedPluginLoadingException, ClassNotFoundException {
Class<?> klass = pluginClass(delegatingLoader, classOrAlias,
Object.class, range);
return newPlugin(klass);
}
public Object newPlugin(String classOrAlias, VersionRange range,
ClassLoader sourceLoader) throws ClassNotFoundException {
- if (range == null && sourceLoader instanceof PluginClassLoader) {
- return newPlugin(sourceLoader.loadClass(classOrAlias));
+ if (sourceLoader instanceof PluginClassLoader) {
+ return newPlugin(pluginLoader(classOrAlias, range,
sourceLoader).loadClass(classOrAlias));
}
return newPlugin(classOrAlias, range);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 81efeab6ca4..9bb4613bf3c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -572,7 +572,8 @@ public class AbstractHerderTest {
AbstractHerder herder = createConfigValidationHerder(connectorClass,
noneConnectorClientConfigOverridePolicy);
// 2 transform aliases defined -> 2 plugin lookups
-
Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
+
Mockito.lenient().when(plugins.transformations()).thenReturn(Set.of(transformationPluginDesc()));
+ Mockito.lenient().when(plugins.connectorLoader(any(),
any())).thenReturn(classLoader);
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(),
null, classLoader)).thenReturn(new SampleTransformation());
// Define 2 transformations. One has a class defined and so can get
embedded configs, the other is missing
@@ -626,8 +627,10 @@ public class AbstractHerderTest {
final Class<? extends Connector> connectorClass =
SampleSourceConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass,
noneConnectorClientConfigOverridePolicy);
-
Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
-
Mockito.lenient().when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc()));
+
Mockito.lenient().when(plugins.transformations()).thenReturn(Set.of(transformationPluginDesc()));
+
Mockito.lenient().when(plugins.predicates()).thenReturn(Set.of(predicatePluginDesc()));
+ Mockito.lenient().when(plugins.connectorLoader(any(),
any())).thenReturn(classLoader);
+
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(),
null, classLoader)).thenReturn(new SampleTransformation());
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(),
null, classLoader)).thenReturn(new SampleTransformation());
Mockito.lenient().when(plugins.newPlugin(SamplePredicate.class.getName(), null,
classLoader)).thenReturn(new SamplePredicate());
@@ -1343,7 +1346,7 @@ public class AbstractHerderTest {
when(workerConfig.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG)).thenReturn((Class)
SimpleHeaderConverter.class);
when(worker.config()).thenReturn(workerConfig);
when(plugins.newConnector(anyString(), any())).thenReturn(connector);
- when(plugins.pluginLoader(connectorClass,
null)).thenReturn(classLoader);
+ when(plugins.connectorLoader(any(), any())).thenReturn(classLoader);
when(plugins.withClassLoader(classLoader)).thenReturn(loaderSwap);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 38bcab1b594..64b6757f27d 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -391,7 +391,6 @@ public class WorkerTest {
mockKafkaClusterId();
mockGenericIsolation();
- when(plugins.pluginLoader(nonConnectorClass,
null)).thenReturn(pluginLoader);
when(plugins.newConnector(nonConnectorClass,
null)).thenThrow(exception);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@@ -610,7 +609,7 @@ public class WorkerTest {
mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
- Map<String, String> origProps =
Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG,
TestSourceTask.class.getName());
+ Map<String, String> origProps = Map.of(TaskConfig.TASK_CLASS_CONFIG,
TestSourceTask.class.getName());
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, executorService,
noneConnectorClientConfigOverridePolicy, null);
@@ -909,7 +908,6 @@ public class WorkerTest {
mockKafkaClusterId();
mockGenericIsolation();
- when(plugins.pluginLoader(SampleSourceConnector.class.getName(),
null)).thenReturn(pluginLoader);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
worker.herder = herder;
@@ -1894,7 +1892,6 @@ public class WorkerTest {
mockKafkaClusterId();
mockGenericIsolation();
when(plugins.connectorClass(anyString(), any())).thenReturn((Class)
sourceConnector.getClass());
- when(plugins.pluginLoader(SampleSourceConnector.class.getName(),
null)).thenReturn(pluginLoader);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, executorService,
allConnectorClientConfigOverridePolicy, mockAdminConstructor);
@@ -2105,8 +2102,7 @@ public class WorkerTest {
mockGenericIsolation();
when(plugins.newConnector(anyString(),
any())).thenReturn(sourceConnector);
- when(plugins.pluginLoader(SampleSourceConnector.class.getName(),
null)).thenReturn(pluginLoader);
- when(plugins.withClassLoader(any(ClassLoader.class),
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+ when(plugins.withClassLoader(any(),
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
when(sourceConnector.alterOffsets(eq(connectorProps),
anyMap())).thenThrow(new UnsupportedOperationException("This connector doesn't
" +
"support altering of offsets"));
@@ -2815,6 +2811,7 @@ public class WorkerTest {
private void testStartTaskWithTooManyTaskConfigs(boolean enforced) {
SinkTask task = mock(TestSinkTask.class);
mockKafkaClusterId();
+ when(plugins.connectorLoader(any(), any())).thenReturn(pluginLoader);
Map<String, String> origProps =
Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG,
TestSinkTask.class.getName());
@@ -3026,11 +3023,11 @@ public class WorkerTest {
}
private void mockGenericIsolation() {
+ when(plugins.connectorLoader(any(), any())).thenReturn(pluginLoader);
when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
}
private void verifyGenericIsolation() {
- verify(plugins, atLeastOnce()).withClassLoader(pluginLoader);
verify(loaderSwap, atLeastOnce()).close();
}
@@ -3042,7 +3039,6 @@ public class WorkerTest {
private void mockVersionedConnectorIsolation(String connectorClass,
VersionRange range, Connector connector) {
mockGenericIsolation();
- when(plugins.pluginLoader(connectorClass,
range)).thenReturn(pluginLoader);
when(plugins.newConnector(connectorClass,
range)).thenReturn(connector);
when(connector.version()).thenReturn(range == null ? "unknown" :
range.toString());
}
@@ -3055,7 +3051,6 @@ public class WorkerTest {
private void verifyVersionedConnectorIsolation(String connectorClass,
VersionRange range, Connector connector) {
verifyGenericIsolation();
- verify(plugins).pluginLoader(connectorClass, range);
verify(plugins).newConnector(connectorClass, range);
verify(connector, atLeastOnce()).version();
}
@@ -3070,7 +3065,6 @@ public class WorkerTest {
@SuppressWarnings({"unchecked", "rawtypes"})
private void mockVersionedTaskIsolation(Class<? extends Connector>
connectorClass, Class<? extends Task> taskClass, VersionRange range, Connector
connector, Task task) {
mockGenericIsolation();
- when(plugins.pluginLoader(connectorClass.getName(),
range)).thenReturn(pluginLoader);
when(plugins.connectorClass(connectorClass.getName(),
range)).thenReturn((Class) connectorClass);
when(plugins.newTask(taskClass)).thenReturn(task);
when(plugins.safeLoaderSwapper()).thenReturn(TestPlugins.noOpLoaderSwap());
@@ -3086,7 +3080,6 @@ public class WorkerTest {
private void verifyVersionedTaskIsolation(Class<? extends Connector>
connectorClass, Class<? extends Task> taskClass, VersionRange range, Task task)
{
verifyGenericIsolation();
- verify(plugins).pluginLoader(connectorClass.getName(), range);
verify(plugins).connectorClass(connectorClass.getName(), range);
verify(plugins).newTask(taskClass);
verify(task, times(2)).version();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
index 749acb3e5b0..76bad03fb01 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
@@ -17,21 +17,27 @@
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.transforms.Cast;
+import org.apache.kafka.connect.transforms.Filter;
+import org.apache.kafka.connect.transforms.Transformation;
+import
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.net.MalformedURLException;
import java.net.URL;
-import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -42,13 +48,30 @@ public class DelegatingClassLoaderTest {
public PluginClassLoader parent;
public PluginClassLoader pluginLoader;
+ public PluginClassLoader pluginLoader2;
+ public PluginClassLoader pluginLoader3;
+ public PluginClassLoader pluginLoader4;
public DelegatingClassLoader classLoader;
- public PluginDesc<SinkConnector> pluginDesc;
+ public PluginDesc<SinkConnector> connectorPluginDesc;
+ public PluginDesc<SinkConnector> connectorPluginDesc2;
+ public PluginDesc<Transformation<?>> cast;
+ public PluginDesc<Transformation<?>> castV1Loader2;
+ public PluginDesc<Transformation<?>> castV1Loader3;
+ public PluginDesc<Transformation<?>> castV2;
+ public PluginDesc<Transformation<?>> filter;
public PluginScanResult scanResult;
+ public String version1 = "1.0";
+ public String version2 = "2.0";
+ public VersionRange range1;
+ public VersionRange range1And2;
+ public VersionRange range2;
+ public VersionRange range123;
// Arbitrary values, their contents is not meaningful.
public static final String ARBITRARY = "arbitrary";
- public static final Class<?> ARBITRARY_CLASS = org.mockito.Mockito.class;
+ public static final Class<?> CONN = Mockito.class;
+ public static final Class<?> CAST = mock(Cast.class).getClass();
+ public static final Class<?> FILTER = mock(Filter.class).getClass();
public static final URL ARBITRARY_URL;
static {
@@ -61,27 +84,51 @@ public class DelegatingClassLoaderTest {
@BeforeEach
@SuppressWarnings({"unchecked"})
- public void setUp() {
+ public void setUp() throws InvalidVersionSpecificationException {
+ range1 = VersionRange.createFromVersionSpec("[" + version1 + "]");
+ range1And2 = VersionRange.createFromVersionSpec("[" + version1 + "," +
version2 + "]");
+ range2 = VersionRange.createFromVersionSpec("[" + version2 + "]");
+ range123 = VersionRange.createFromVersionSpec("[123]");
parent = mock(PluginClassLoader.class);
pluginLoader = mock(PluginClassLoader.class);
+ pluginLoader2 = mock(PluginClassLoader.class);
+ pluginLoader3 = mock(PluginClassLoader.class);
+ pluginLoader4 = mock(PluginClassLoader.class);
classLoader = new DelegatingClassLoader(parent);
SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
// Lie to the DCL that this arbitrary class is a connector, since all
real connector classes we have access to
// are forced to be non-isolated by PluginUtils.shouldLoadInIsolation.
when(pluginLoader.location()).thenReturn("some-location");
- pluginDesc = new PluginDesc<>((Class<? extends SinkConnector>)
ARBITRARY_CLASS, null, PluginType.SINK, pluginLoader);
- assertTrue(PluginUtils.shouldLoadInIsolation(pluginDesc.className()));
- sinkConnectors.add(pluginDesc);
+ when(pluginLoader2.location()).thenReturn("some-location2");
+ when(pluginLoader3.location()).thenReturn("some-location3");
+ when(pluginLoader4.location()).thenReturn("some-location4");
+ connectorPluginDesc = new PluginDesc<>((Class<? extends
SinkConnector>) CONN, null, PluginType.SINK, pluginLoader);
+ connectorPluginDesc2 = new PluginDesc<>((Class<? extends
SinkConnector>) CONN, version1, PluginType.SINK, pluginLoader2);
+
assertTrue(PluginUtils.shouldLoadInIsolation(connectorPluginDesc.className()));
+
assertTrue(PluginUtils.shouldLoadInIsolation(connectorPluginDesc2.className()));
+ sinkConnectors.add(connectorPluginDesc);
+ sinkConnectors.add(connectorPluginDesc2);
+ SortedSet<PluginDesc<Transformation<?>>> transformations = new
TreeSet<>();
+ cast = new PluginDesc<>((Class<? extends Transformation<?>>) CAST,
null, PluginType.TRANSFORMATION, pluginLoader);
+ castV1Loader2 = new PluginDesc<>((Class<? extends Transformation<?>>)
CAST, version1, PluginType.TRANSFORMATION, pluginLoader2);
+ castV1Loader3 = new PluginDesc<>((Class<? extends Transformation<?>>)
CAST, version1, PluginType.TRANSFORMATION, pluginLoader3);
+ castV2 = new PluginDesc<>((Class<? extends Transformation<?>>) CAST,
version2, PluginType.TRANSFORMATION, pluginLoader4);
+ filter = new PluginDesc<>((Class<? extends Transformation<?>>) FILTER,
null, PluginType.TRANSFORMATION, pluginLoader4);
+ transformations.add(cast);
+ transformations.add(castV1Loader2);
+ transformations.add(castV1Loader3);
+ transformations.add(castV2);
+ transformations.add(filter);
scanResult = new PluginScanResult(
- sinkConnectors,
- Collections.emptySortedSet(),
- Collections.emptySortedSet(),
- Collections.emptySortedSet(),
- Collections.emptySortedSet(),
- Collections.emptySortedSet(),
- Collections.emptySortedSet(),
- Collections.emptySortedSet(),
- Collections.emptySortedSet()
+ sinkConnectors,
+ new TreeSet<>(),
+ new TreeSet<>(),
+ new TreeSet<>(),
+ transformations,
+ new TreeSet<>(),
+ new TreeSet<>(),
+ new TreeSet<>(),
+ new TreeSet<>()
);
}
@@ -93,8 +140,8 @@ public class DelegatingClassLoaderTest {
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void testEmptyLoadClass() throws ClassNotFoundException {
- when(parent.loadClass(ARBITRARY, false)).thenReturn((Class)
ARBITRARY_CLASS);
- assertSame(ARBITRARY_CLASS, classLoader.loadClass(ARBITRARY, false));
+ when(parent.loadClass(ARBITRARY, false)).thenReturn((Class) CAST);
+ assertSame(CAST, classLoader.loadClass(ARBITRARY, false));
}
@Test
@@ -106,17 +153,103 @@ public class DelegatingClassLoaderTest {
@Test
public void testInitializedConnectorLoader() {
classLoader.installDiscoveredPlugins(scanResult);
- assertSame(pluginLoader,
classLoader.connectorLoader(PluginUtils.prunedName(pluginDesc)));
- assertSame(pluginLoader,
classLoader.connectorLoader(PluginUtils.simpleName(pluginDesc)));
- assertSame(pluginLoader,
classLoader.connectorLoader(pluginDesc.className()));
+ ClassLoader expectedLoader =
scanResult.sinkConnectors().last().loader();
+ assertSame(expectedLoader,
classLoader.connectorLoader(connectorPluginDesc.className()));
+ }
+
+ @Test
+ public void testInitializedConnectorLoaderWithVersion() {
+ classLoader.installDiscoveredPlugins(scanResult);
+ // connector v1 is only in pluginLoader2
+ assertSame(pluginLoader2,
classLoader.connectorLoader(connectorPluginDesc.className(), range1));
+
+ // connector v123 cannot be found
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.connectorLoader(cast.className(), range123));
+ }
+
+ @Test
+ public void testInitializedPluginLoader() {
+ classLoader.installDiscoveredPlugins(scanResult);
+ // without a loader or version, the last loader that has the plugin is
picked
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), null, null));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), null, null));
+ }
+
+ @Test
+ public void testInitializedPluginLoaderWithClassLoader() {
+ classLoader.installDiscoveredPlugins(scanResult);
+ // when range is not provided return our classloader if it has the
plugin
+ assertSame(pluginLoader,
classLoader.pluginLoader(PluginUtils.prunedName(cast), null, pluginLoader));
+ assertSame(pluginLoader,
classLoader.pluginLoader(PluginUtils.simpleName(cast), null, pluginLoader));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.prunedName(cast), null, pluginLoader3));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.simpleName(cast), null, pluginLoader3));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(filter), null, pluginLoader4));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(filter), null, pluginLoader4));
+
+ // when range is not provided return the classloader which has the
plugin if it's no in our classloader
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(filter), null, pluginLoader));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(filter), null, pluginLoader));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(filter), null, pluginLoader3));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(filter), null, pluginLoader3));
+
+ // when range is not provided return the last classloader which has
the plugin if it's no in our classloader
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), null, pluginLoader4));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), null, pluginLoader4));
+ }
+
+ @Test
+ public void testInitializedPluginLoaderWithVersion() {
+ classLoader.installDiscoveredPlugins(scanResult);
+ // cast v1 is in both pluginLoader2 and pluginLoader3, we prefer the
specified loader
+ assertSame(pluginLoader2,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1, pluginLoader2));
+ assertSame(pluginLoader2,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1, pluginLoader2));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1, pluginLoader3));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1, pluginLoader3));
+
+ // cast v1 is in both pluginLoader2 and pluginLoader3, we prefer the
last loader
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1, pluginLoader));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1, pluginLoader));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1, pluginLoader4));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1, pluginLoader4));
+
+ // both cast v1 and v2 match the range, we prefer the specified loader
+ assertSame(pluginLoader2,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1And2,
pluginLoader2));
+ assertSame(pluginLoader2,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1And2,
pluginLoader2));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1And2,
pluginLoader3));
+ assertSame(pluginLoader3,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1And2,
pluginLoader3));
+
+ // both cast v1 and v2 match the range, we prefer the last loader
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1And2,
pluginLoader));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1And2,
pluginLoader));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range1And2,
pluginLoader4));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range1And2,
pluginLoader4));
+
+ // cast v2 is only in pluginLoader4
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range2, pluginLoader));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range2, pluginLoader));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range2, pluginLoader3));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range2, pluginLoader3));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.prunedName(cast), range2, pluginLoader4));
+ assertSame(pluginLoader4,
classLoader.pluginLoader(PluginUtils.simpleName(cast), range2, pluginLoader4));
+
+ // cast v123 cannot be found
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.prunedName(cast), range123, pluginLoader));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.simpleName(cast), range123, pluginLoader));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.prunedName(cast), range123,
pluginLoader2));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.simpleName(cast), range123,
pluginLoader2));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.prunedName(cast), range123,
pluginLoader3));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.simpleName(cast), range123,
pluginLoader3));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.prunedName(cast), range123,
pluginLoader4));
+ assertThrows(VersionedPluginLoadingException.class, () ->
classLoader.pluginLoader(PluginUtils.simpleName(cast), range123,
pluginLoader4));
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void testInitializedLoadClass() throws ClassNotFoundException {
classLoader.installDiscoveredPlugins(scanResult);
- String className = pluginDesc.className();
- when(pluginLoader.loadClass(className, false)).thenReturn((Class)
ARBITRARY_CLASS);
- assertSame(ARBITRARY_CLASS, classLoader.loadClass(className, false));
+ String className = connectorPluginDesc.className();
+ // Use the last loader that has CONN
+ when(pluginLoader2.loadClass(className, false)).thenReturn((Class)
CONN);
+ assertSame(CONN, classLoader.loadClass(className, false));
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
index b770de74b80..1894ec365a5 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java
@@ -58,7 +58,7 @@ public class MultiVersionTest {
String pluginLocation = entry.getKey().toAbsolutePath().toString();
for (VersionedPluginBuilder.BuildInfo buildInfo :
entry.getValue()) {
- ClassLoader pluginLoader =
plugins.pluginLoader(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+ ClassLoader pluginLoader =
plugins.pluginLoader(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()), null);
Assertions.assertInstanceOf(PluginClassLoader.class,
pluginLoader);
Assertions.assertTrue(((PluginClassLoader)
pluginLoader).location().contains(pluginLocation));
Object p = plugins.newPlugin(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
@@ -167,7 +167,8 @@ public class MultiVersionTest {
// get the connector loader of the combined artifact which includes
all plugin types
ClassLoader connectorLoader = plugins.pluginLoader(
VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className(),
- PluginUtils.connectorVersionRequirement("0.1.0")
+ PluginUtils.connectorVersionRequirement("0.1.0"),
+ null
);
Assertions.assertInstanceOf(PluginClassLoader.class, connectorLoader);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index ca4c29931d0..87f4ce2c3a2 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -61,6 +61,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;
@@ -355,7 +356,7 @@ public class PluginsTest {
@Test
public void newConverterShouldConfigureWithPluginClassLoader() {
props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
TestPlugin.SAMPLING_CONVERTER.className());
- ClassLoader classLoader =
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONVERTER.className());
+ ClassLoader classLoader =
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONVERTER.className(),
null, Optional.empty());
try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
createConfig();
}
@@ -377,7 +378,7 @@ public class PluginsTest {
String providerPrefix = "some.provider";
props.put(providerPrefix + ".class",
TestPlugin.SAMPLING_CONFIG_PROVIDER.className());
- PluginClassLoader classLoader =
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONFIG_PROVIDER.className());
+ PluginClassLoader classLoader =
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONFIG_PROVIDER.className(),
null, Optional.empty());
assertNotNull(classLoader);
try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
createConfig();
@@ -398,7 +399,7 @@ public class PluginsTest {
@Test
public void newHeaderConverterShouldConfigureWithPluginClassLoader() {
props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
TestPlugin.SAMPLING_HEADER_CONVERTER.className());
- ClassLoader classLoader =
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_HEADER_CONVERTER.className());
+ ClassLoader classLoader =
plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_HEADER_CONVERTER.className(),
null, Optional.empty());
try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
createConfig();
}
@@ -590,7 +591,7 @@ public class PluginsTest {
@Test
public void testAliasesInConverters() throws ClassNotFoundException {
- ClassLoader connectorLoader =
plugins.connectorLoader(TestPlugin.SAMPLING_CONNECTOR.className());
+ ClassLoader connectorLoader =
plugins.connectorLoader(TestPlugin.SAMPLING_CONNECTOR.className(), null);
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader))
{
String configKey = "config.key";
String alias = "SamplingConverter";
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index cf2e53f1846..198118d8677 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -46,6 +46,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingDeque;
@@ -191,10 +192,10 @@ public class SynchronizationTest {
}
@Override
- public PluginClassLoader pluginClassLoader(String name, VersionRange
range) {
+ PluginClassLoader pluginClassLoader(String name, VersionRange range,
Optional<ClassLoader> connectorLoader) {
dclBreakpoint.await(name);
dclBreakpoint.await(name);
- return super.pluginClassLoader(name, range);
+ return super.pluginClassLoader(name, range, connectorLoader);
}
}
@@ -225,7 +226,7 @@ public class SynchronizationTest {
public void testSimultaneousUpwardAndDownwardDelegating() throws Exception
{
String t1Class = TestPlugins.TestPlugin.SAMPLING_CONVERTER.className();
// Grab a reference to the target PluginClassLoader before activating
breakpoints
- ClassLoader connectorLoader = plugins.connectorLoader(t1Class);
+ ClassLoader connectorLoader = plugins.connectorLoader(t1Class, null);
// THREAD 1: loads a class by delegating downward starting from the
DelegatingClassLoader
// DelegatingClassLoader breakpoint will only trigger on this thread
@@ -305,7 +306,7 @@ public class SynchronizationTest {
public void testPluginClassLoaderDoesntHoldMonitorLock()
throws InterruptedException, TimeoutException, BrokenBarrierException {
String t1Class = TestPlugins.TestPlugin.SAMPLING_CONVERTER.className();
- ClassLoader connectorLoader = plugins.connectorLoader(t1Class);
+ ClassLoader connectorLoader = plugins.connectorLoader(t1Class, null);
Object externalTestLock = new Object();
Breakpoint<Object> testBreakpoint = new Breakpoint<>();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 76b25131cc5..8fa76d9dca9 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -197,8 +197,7 @@ public class StandaloneHerderTest {
when(worker.config()).thenReturn(workerConfig);
when(workerConfig.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG)).thenReturn((Class)
SimpleHeaderConverter.class);
when(plugins.newConnector(anyString(),
any())).thenReturn(connectorMock);
- when(plugins.pluginLoader(anyString(),
any())).thenReturn(pluginLoader);
- when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
+ when(plugins.withClassLoader(null)).thenReturn(loaderSwap);
when(connectorMock.config()).thenReturn(new ConfigDef());
ConfigValue validatedValue = new ConfigValue("foo.bar");
@@ -889,7 +888,8 @@ public class StandaloneHerderTest {
when(worker.config()).thenReturn(workerConfig);
when(worker.getPlugins()).thenReturn(plugins);
when(workerConfig.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG)).thenReturn((Class)
SimpleHeaderConverter.class);
- when(plugins.pluginLoader(anyString(),
any())).thenReturn(pluginLoader);
+ //when(plugins.pluginLoader(anyString(), any(),
any())).thenReturn(pluginLoader);
+ when(plugins.connectorLoader(any(), any())).thenReturn(pluginLoader);
when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
when(plugins.newConnector(anyString(),
any())).thenReturn(connectorMock);
when(connectorMock.config()).thenReturn(configDef);
@@ -1244,7 +1244,8 @@ public class StandaloneHerderTest {
when(transformer.transform(configCapture.capture())).thenAnswer(invocation ->
configCapture.getValue());
when(worker.config()).thenReturn(workerConfig);
when(workerConfig.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG)).thenReturn((Class)
SimpleHeaderConverter.class);
- when(plugins.pluginLoader(anyString(),
any())).thenReturn(pluginLoader);
+ //when(plugins.pluginLoader(anyString(), any(),
any())).thenReturn(pluginLoader);
+ when(plugins.connectorLoader(any(), any())).thenReturn(pluginLoader);
when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
// Assume the connector should always be created
when(worker.getPlugins()).thenReturn(plugins);