This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 6b71e3f35dd66f64f84db8029c5061c6d458a147 Author: Chris Egerton <[email protected]> AuthorDate: Mon Jan 30 12:06:02 2023 -0500 KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions (#13148) Reviewers: Mickael Maison <[email protected]>, Greg Harris <[email protected]> --- .../kafka/connect/runtime/AbstractHerder.java | 27 ++++++--- .../kafka/connect/runtime/isolation/Plugins.java | 4 ++ .../kafka/connect/runtime/AbstractHerderTest.java | 65 +++++++++++----------- 3 files changed, 53 insertions(+), 43 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 90adde67a6f..0fa6c33b69c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -29,7 +29,9 @@ import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; +import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; @@ -787,12 +789,18 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con @Override public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { - List<ConfigKeyInfo> results = new ArrayList<>(); - ConfigDef configDefs; + Plugins p = plugins(); + Class<?> pluginClass; try { - Plugins p = plugins(); + pluginClass = p.pluginClass(pluginName); + } catch (ClassNotFoundException cnfe) { + throw new NotFoundException("Unknown plugin " + pluginName + "."); + } + + try (LoaderSwap loaderSwap = p.withClassLoader(pluginClass.getClassLoader())) { Object plugin = p.newPlugin(pluginName); PluginType pluginType = PluginType.from(plugin.getClass()); + ConfigDef configDefs; switch (pluginType) { case SINK: case SOURCE: @@ -813,13 +821,14 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con default: throw new BadRequestException("Invalid plugin type " + pluginType + ". Valid types are sink, source, converter, header_converter, transformation, predicate."); } - } catch (ClassNotFoundException cnfe) { - throw new NotFoundException("Unknown plugin " + pluginName + "."); - } - for (ConfigDef.ConfigKey configKey : configDefs.configKeys().values()) { - results.add(AbstractHerder.convertConfigKey(configKey)); + List<ConfigKeyInfo> results = new ArrayList<>(); + for (ConfigDef.ConfigKey configKey : configDefs.configKeys().values()) { + results.add(AbstractHerder.convertConfigKey(configKey)); + } + return results; + } catch (ClassNotFoundException e) { + throw new ConnectException("Failed to load plugin class or one of its dependencies", e); } - return results; } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 6d961272399..a7ebaafe9ba 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -120,6 +120,10 @@ public class Plugins { ); } + public Class<?> pluginClass(String classOrAlias) throws ClassNotFoundException { + return pluginClass(delegatingLoader, classOrAlias, Object.class); + } + public static ClassLoader compareAndSwapLoaders(ClassLoader loader) { ClassLoader current = Thread.currentThread().getContextClassLoader(); if (!current.equals(loader)) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index ada507f1eb3..66c28654c0d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -69,9 +69,11 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.kafka.connect.runtime.AbstractHerder.keysWithVariableValues; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.partialMockBuilder; import static org.easymock.EasyMock.strictMock; @@ -917,7 +919,17 @@ public class AbstractHerderTest { } @Test - public void testConnectorPluginConfig() throws Exception { + public void testPredicatePluginConfig() throws ClassNotFoundException { + testConnectorPluginConfig("predicate", SamplePredicate::new, SamplePredicate::config); + } + + @Test + public void testTransformationPluginConfig() throws ClassNotFoundException { + testConnectorPluginConfig("transformation", SampleTransformation::new, SampleTransformation::config); + } + + @SuppressWarnings("unchecked") + private <T> void testConnectorPluginConfig(String pluginName, Supplier<T> newPluginInstance, Function<T, ConfigDef> pluginConfig) throws ClassNotFoundException { AbstractHerder herder = partialMockBuilder(AbstractHerder.class) .withConstructor( Worker.class, @@ -931,43 +943,25 @@ public class AbstractHerderTest { .addMockedMethod("generation") .createMock(); - EasyMock.expect(plugins.newPlugin(EasyMock.anyString())).andAnswer(() -> { - String name = (String) EasyMock.getCurrentArguments()[0]; - switch (name) { - case "sink": return new SampleSinkConnector(); - case "source": return new SampleSourceConnector(); - case "converter": return new SampleConverterWithHeaders(); - case "header-converter": return new SampleHeaderConverter(); - case "predicate": return new SamplePredicate(); - default: return new SampleTransformation<>(); - } - }).anyTimes(); - EasyMock.expect(herder.plugins()).andStubReturn(plugins); - replayAll(); - - List<ConfigKeyInfo> sinkConnectorConfigs = herder.connectorPluginConfig("sink"); - assertNotNull(sinkConnectorConfigs); - assertEquals(new SampleSinkConnector().config().names().size(), sinkConnectorConfigs.size()); + @SuppressWarnings("rawtypes") + Class pluginClass = newPluginInstance.get().getClass(); - List<ConfigKeyInfo> sourceConnectorConfigs = herder.connectorPluginConfig("source"); - assertNotNull(sourceConnectorConfigs); - assertEquals(new SampleSourceConnector().config().names().size(), sourceConnectorConfigs.size()); + EasyMock.expect(plugins.pluginClass(pluginName)).andAnswer(() -> pluginClass); + EasyMock.expect(plugins.newPlugin(anyString())).andAnswer(newPluginInstance::get); + // Make sure that we used the correct class loader when interacting with the plugin + // The result of this method is used exclusively in try-with-resources blocks, which allow for null values + EasyMock.expect(plugins.withClassLoader(pluginClass.getClassLoader())).andReturn(null); + EasyMock.expect(herder.plugins()).andStubReturn(plugins); - List<ConfigKeyInfo> converterConfigs = herder.connectorPluginConfig("converter"); - assertNotNull(converterConfigs); - assertEquals(new SampleConverterWithHeaders().config().names().size(), converterConfigs.size()); + replayAll(); - List<ConfigKeyInfo> headerConverterConfigs = herder.connectorPluginConfig("header-converter"); - assertNotNull(headerConverterConfigs); - assertEquals(new SampleHeaderConverter().config().names().size(), headerConverterConfigs.size()); + List<ConfigKeyInfo> configs = herder.connectorPluginConfig(pluginName); + assertNotNull(configs); - List<ConfigKeyInfo> predicateConfigs = herder.connectorPluginConfig("predicate"); - assertNotNull(predicateConfigs); - assertEquals(new SamplePredicate().config().names().size(), predicateConfigs.size()); + ConfigDef expectedConfig = pluginConfig.apply(newPluginInstance.get()); + assertEquals(expectedConfig.names().size(), configs.size()); - List<ConfigKeyInfo> transformationConfigs = herder.connectorPluginConfig("transformation"); - assertNotNull(transformationConfigs); - assertEquals(new SampleTransformation<>().config().names().size(), transformationConfigs.size()); + PowerMock.verifyAll(); } @Test(expected = NotFoundException.class) @@ -980,12 +974,13 @@ public class AbstractHerderTest { .addMockedMethod("generation") .createMock(); EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); - EasyMock.expect(plugins.newPlugin(anyString())).andThrow(new ClassNotFoundException()); + EasyMock.expect(plugins.pluginClass(connName)).andThrow(new ClassNotFoundException()); replayAll(); herder.connectorPluginConfig(connName); } @Test(expected = BadRequestException.class) + @SuppressWarnings({"rawtypes", "unchecked"}) public void testGetConnectorConfigDefWithInvalidPluginType() throws Exception { String connName = "AnotherPlugin"; AbstractHerder herder = partialMockBuilder(AbstractHerder.class) @@ -995,6 +990,8 @@ public class AbstractHerderTest { .addMockedMethod("generation") .createMock(); EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); + EasyMock.expect(plugins.pluginClass(connName)).andReturn((Class) DirectoryConfigProvider.class); + EasyMock.expect(plugins.withClassLoader(anyObject())).andReturn(null); EasyMock.expect(plugins.newPlugin(anyString())).andReturn(new DirectoryConfigProvider()); replayAll(); herder.connectorPluginConfig(connName);
