This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 17559d581e9 KAFKA-14645: Use plugin classloader when retrieving
connector plugin config definitions (#13148)
17559d581e9 is described below
commit 17559d581e9ba89eb3c96b32f232cfa03d8b45a5
Author: Chris Egerton <[email protected]>
AuthorDate: Mon Jan 30 12:06:02 2023 -0500
KAFKA-14645: Use plugin classloader when retrieving connector plugin config
definitions (#13148)
Reviewers: Mickael Maison <[email protected]>, Greg Harris
<[email protected]>
---
.../kafka/connect/runtime/AbstractHerder.java | 25 ++++---
.../kafka/connect/runtime/isolation/Plugins.java | 4 ++
.../kafka/connect/runtime/AbstractHerderTest.java | 76 ++++++++++++----------
3 files changed, 61 insertions(+), 44 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 15fb23d35a4..32ef4f88f91 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -797,12 +797,18 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
@Override
public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
- List<ConfigKeyInfo> results = new ArrayList<>();
- ConfigDef configDefs;
+ Plugins p = plugins();
+ Class<?> pluginClass;
try {
- Plugins p = plugins();
+ pluginClass = p.pluginClass(pluginName);
+ } catch (ClassNotFoundException cnfe) {
+ throw new NotFoundException("Unknown plugin " + pluginName + ".");
+ }
+
+ try (LoaderSwap loaderSwap =
p.withClassLoader(pluginClass.getClassLoader())) {
Object plugin = p.newPlugin(pluginName);
PluginType pluginType = PluginType.from(plugin.getClass());
+ ConfigDef configDefs;
switch (pluginType) {
case SINK:
case SOURCE:
@@ -823,13 +829,14 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
default:
throw new BadRequestException("Invalid plugin type " +
pluginType + ". Valid types are sink, source, converter, header_converter,
transformation, predicate.");
}
- } catch (ClassNotFoundException cnfe) {
- throw new NotFoundException("Unknown plugin " + pluginName + ".");
- }
- for (ConfigDef.ConfigKey configKey : configDefs.configKeys().values())
{
- results.add(AbstractHerder.convertConfigKey(configKey));
+ List<ConfigKeyInfo> results = new ArrayList<>();
+ for (ConfigDef.ConfigKey configKey :
configDefs.configKeys().values()) {
+ results.add(AbstractHerder.convertConfigKey(configKey));
+ }
+ return results;
+ } catch (ClassNotFoundException e) {
+ throw new ConnectException("Failed to load plugin class or one of
its dependencies", e);
}
- return results;
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 311c14be929..a741c9cddb6 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -123,6 +123,10 @@ public class Plugins {
);
}
+ public Class<?> pluginClass(String classOrAlias) throws
ClassNotFoundException {
+ return pluginClass(delegatingLoader, classOrAlias, Object.class);
+ }
+
public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
ClassLoader current = Thread.currentThread().getContextClassLoader();
if (!current.equals(loader)) {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 2e5a82f4f98..ec2f38a2799 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -67,6 +67,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static
org.apache.kafka.connect.runtime.AbstractHerder.keysWithVariableValues;
@@ -899,48 +900,51 @@ public class AbstractHerderTest {
}
@Test
- public void testConnectorPluginConfig() throws Exception {
+ public void testSinkConnectorPluginConfig() throws ClassNotFoundException {
+ testConnectorPluginConfig("sink", SampleSinkConnector::new,
SampleSinkConnector::config);
+ }
- AbstractHerder herder = mock(AbstractHerder.class, withSettings()
- .useConstructor(worker, workerId, kafkaClusterId, statusStore,
configStore, noneConnectorClientConfigOverridePolicy)
- .defaultAnswer(CALLS_REAL_METHODS));
+ @Test
+ public void testSourceConnectorPluginConfig() throws
ClassNotFoundException {
+ testConnectorPluginConfig("source", SampleSourceConnector::new,
SampleSourceConnector::config);
+ }
- when(plugins.newPlugin(anyString())).then(invocation -> {
- String name = invocation.getArgument(0);
- switch (name) {
- case "sink": return new SampleSinkConnector();
- case "source": return new SampleSourceConnector();
- case "converter": return new SampleConverterWithHeaders();
- case "header-converter": return new SampleHeaderConverter();
- case "predicate": return new SamplePredicate();
- default: return new SampleTransformation<>();
- }
- });
- when(herder.plugins()).thenReturn(plugins);
+ @Test
+ public void testConverterPluginConfig() throws ClassNotFoundException {
+ testConnectorPluginConfig("converter",
SampleConverterWithHeaders::new, SampleConverterWithHeaders::config);
+ }
+
+ @Test
+ public void testHeaderConverterPluginConfig() throws
ClassNotFoundException {
+ testConnectorPluginConfig("header-converter",
SampleHeaderConverter::new, SampleHeaderConverter::config);
+ }
- List<ConfigKeyInfo> sinkConnectorConfigs =
herder.connectorPluginConfig("sink");
- assertNotNull(sinkConnectorConfigs);
- assertEquals(new SampleSinkConnector().config().names().size(),
sinkConnectorConfigs.size());
+ @Test
+ public void testPredicatePluginConfig() throws ClassNotFoundException {
+ testConnectorPluginConfig("predicate", SamplePredicate::new,
SamplePredicate::config);
+ }
- List<ConfigKeyInfo> sourceConnectorConfigs =
herder.connectorPluginConfig("source");
- assertNotNull(sourceConnectorConfigs);
- assertEquals(new SampleSourceConnector().config().names().size(),
sourceConnectorConfigs.size());
+ @Test
+ public void testTransformationPluginConfig() throws ClassNotFoundException
{
+ testConnectorPluginConfig("transformation", SampleTransformation::new,
SampleTransformation::config);
+ }
- List<ConfigKeyInfo> converterConfigs =
herder.connectorPluginConfig("converter");
- assertNotNull(converterConfigs);
- assertEquals(new SampleConverterWithHeaders().config().names().size(),
converterConfigs.size());
+ private <T> void testConnectorPluginConfig(String pluginName, Supplier<T>
newPluginInstance, Function<T, ConfigDef> pluginConfig) throws
ClassNotFoundException {
+ AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+ .useConstructor(worker, workerId, kafkaClusterId, statusStore,
configStore, noneConnectorClientConfigOverridePolicy)
+ .defaultAnswer(CALLS_REAL_METHODS));
- List<ConfigKeyInfo> headerConverterConfigs =
herder.connectorPluginConfig("header-converter");
- assertNotNull(headerConverterConfigs);
- assertEquals(new SampleHeaderConverter().config().names().size(),
headerConverterConfigs.size());
+ when(plugins.pluginClass(pluginName)).then(invocation ->
newPluginInstance.get().getClass());
+ when(plugins.newPlugin(anyString())).then(invocation ->
newPluginInstance.get());
+ when(herder.plugins()).thenReturn(plugins);
- List<ConfigKeyInfo> predicateConfigs =
herder.connectorPluginConfig("predicate");
- assertNotNull(predicateConfigs);
- assertEquals(new SamplePredicate().config().names().size(),
predicateConfigs.size());
+ List<ConfigKeyInfo> configs = herder.connectorPluginConfig(pluginName);
+ assertNotNull(configs);
- List<ConfigKeyInfo> transformationConfigs =
herder.connectorPluginConfig("transformation");
- assertNotNull(transformationConfigs);
- assertEquals(new SampleTransformation<>().config().names().size(),
transformationConfigs.size());
+ ConfigDef expectedConfig = pluginConfig.apply(newPluginInstance.get());
+ assertEquals(expectedConfig.names().size(), configs.size());
+ // Make sure that we used the correct class loader when interacting
with the plugin
+
verify(plugins).withClassLoader(newPluginInstance.get().getClass().getClassLoader());
}
@Test(expected = NotFoundException.class)
@@ -950,17 +954,19 @@ public class AbstractHerderTest {
.useConstructor(worker, workerId, kafkaClusterId, statusStore,
configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(worker.getPlugins()).thenReturn(plugins);
- when(plugins.newPlugin(anyString())).thenThrow(new
ClassNotFoundException());
+ when(plugins.pluginClass(anyString())).thenThrow(new
ClassNotFoundException());
herder.connectorPluginConfig(connName);
}
@Test(expected = BadRequestException.class)
+ @SuppressWarnings({"rawtypes", "unchecked"})
public void testGetConnectorConfigDefWithInvalidPluginType() throws
Exception {
String connName = "AnotherPlugin";
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore,
configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(worker.getPlugins()).thenReturn(plugins);
+ when(plugins.pluginClass(anyString())).thenReturn((Class)
Object.class);
when(plugins.newPlugin(anyString())).thenReturn(new
DirectoryConfigProvider());
herder.connectorPluginConfig(connName);
}