This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 507afe40ad31daa429d01e655317c6e70cd4fec5 Author: Chris Egerton <[email protected]> AuthorDate: Mon Jan 30 12:06:02 2023 -0500 KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions (#13148) Reviewers: Mickael Maison <[email protected]>, Greg Harris <[email protected]> --- .../kafka/connect/runtime/AbstractHerder.java | 30 ++++++--- .../kafka/connect/runtime/isolation/Plugins.java | 4 ++ .../kafka/connect/runtime/AbstractHerderTest.java | 73 +++++++++++----------- 3 files changed, 63 insertions(+), 44 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 2fe75a955b0..e5dd9790fb8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.isolation.PluginType; @@ -757,12 +758,20 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con @Override public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { - List<ConfigKeyInfo> results = new ArrayList<>(); - ConfigDef configDefs; + Plugins p = plugins(); + Class<?> pluginClass; + try { + pluginClass = p.pluginClass(pluginName); + } catch (ClassNotFoundException cnfe) { + throw new NotFoundException("Unknown plugin " + pluginName + "."); + } + + ClassLoader pluginLoader = pluginClass.getClassLoader(); + ClassLoader savedLoader = Plugins.compareAndSwapLoaders(pluginLoader); try { - Plugins p = plugins(); Object plugin = p.newPlugin(pluginName); PluginType pluginType = PluginType.from(plugin.getClass()); + ConfigDef configDefs; switch (pluginType) { case SINK: case SOURCE: @@ -783,13 +792,16 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con default: throw new BadRequestException("Invalid plugin type " + pluginType + ". Valid types are sink, source, converter, header_converter, transformation, predicate."); } - } catch (ClassNotFoundException cnfe) { - throw new NotFoundException("Unknown plugin " + pluginName + "."); - } - for (ConfigDef.ConfigKey configKey : configDefs.configKeys().values()) { - results.add(AbstractHerder.convertConfigKey(configKey)); + List<ConfigKeyInfo> results = new ArrayList<>(); + for (ConfigDef.ConfigKey configKey : configDefs.configKeys().values()) { + results.add(AbstractHerder.convertConfigKey(configKey)); + } + return results; + } catch (ClassNotFoundException e) { + throw new ConnectException("Failed to load plugin class or one of its dependencies", e); + } finally { + Plugins.compareAndSwapLoaders(savedLoader); } - return results; } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 7ec73ba78b8..8960874d4c4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -120,6 +120,10 @@ public class Plugins { ); } + public Class<?> pluginClass(String classOrAlias) throws ClassNotFoundException { + return pluginClass(delegatingLoader, classOrAlias, Object.class); + } + public static ClassLoader compareAndSwapLoaders(ClassLoader loader) { ClassLoader current = Thread.currentThread().getContextClassLoader(); if (!current.equals(loader)) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 5b9e199e5a1..48fa6231de5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -68,10 +68,12 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.kafka.connect.runtime.AbstractHerder.keysWithVariableValues; import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.isNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.powermock.api.easymock.PowerMock.verifyAll; @@ -84,7 +86,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) -@PrepareForTest({AbstractHerder.class}) +@PrepareForTest({AbstractHerder.class, Plugins.class}) public class AbstractHerderTest { private static final String CONN1 = "sourceA"; @@ -905,7 +907,19 @@ public class AbstractHerderTest { } @Test - public void testConnectorPluginConfig() throws Exception { + public void testPredicatePluginConfig() throws ClassNotFoundException { + testConnectorPluginConfig("predicate", SamplePredicate::new, SamplePredicate::config); + } + + @Test + public void testTransformationPluginConfig() throws ClassNotFoundException { + testConnectorPluginConfig("transformation", SampleTransformation::new, SampleTransformation::config); + } + + @SuppressWarnings("unchecked") + private <T> void testConnectorPluginConfig(String pluginName, Supplier<T> newPluginInstance, Function<T, ConfigDef> pluginConfig) throws ClassNotFoundException { + PowerMock.mockStatic(Plugins.class); + AbstractHerder herder = partialMockBuilder(AbstractHerder.class) .withConstructor( Worker.class, @@ -919,43 +933,26 @@ public class AbstractHerderTest { .addMockedMethod("generation") .createMock(); - EasyMock.expect(plugins.newPlugin(EasyMock.anyString())).andAnswer(() -> { - String name = (String) EasyMock.getCurrentArguments()[0]; - switch (name) { - case "sink": return new SampleSinkConnector(); - case "source": return new SampleSourceConnector(); - case "converter": return new SampleConverterWithHeaders(); - case "header-converter": return new SampleHeaderConverter(); - case "predicate": return new SamplePredicate(); - default: return new SampleTransformation<>(); - } - }).anyTimes(); - EasyMock.expect(herder.plugins()).andStubReturn(plugins); - replayAll(); - - List<ConfigKeyInfo> sinkConnectorConfigs = herder.connectorPluginConfig("sink"); - assertNotNull(sinkConnectorConfigs); - assertEquals(new SampleSinkConnector().config().names().size(), sinkConnectorConfigs.size()); + @SuppressWarnings("rawtypes") + Class pluginClass = newPluginInstance.get().getClass(); - List<ConfigKeyInfo> sourceConnectorConfigs = herder.connectorPluginConfig("source"); - assertNotNull(sourceConnectorConfigs); - assertEquals(new SampleSourceConnector().config().names().size(), sourceConnectorConfigs.size()); + EasyMock.expect(plugins.pluginClass(pluginName)).andAnswer(() -> pluginClass); + // Make sure that we used the correct class loader when interacting with the plugin + // The result of this method is used exclusively in try-with-resources blocks, which allow for null values + EasyMock.expect(Plugins.compareAndSwapLoaders(pluginClass.getClassLoader())).andReturn(null); + EasyMock.expect(Plugins.compareAndSwapLoaders((ClassLoader) isNull())).andReturn(null); + EasyMock.expect(plugins.newPlugin(anyString())).andAnswer(newPluginInstance::get); + EasyMock.expect(herder.plugins()).andStubReturn(plugins); - List<ConfigKeyInfo> converterConfigs = herder.connectorPluginConfig("converter"); - assertNotNull(converterConfigs); - assertEquals(new SampleConverterWithHeaders().config().names().size(), converterConfigs.size()); + replayAll(); - List<ConfigKeyInfo> headerConverterConfigs = herder.connectorPluginConfig("header-converter"); - assertNotNull(headerConverterConfigs); - assertEquals(new SampleHeaderConverter().config().names().size(), headerConverterConfigs.size()); + List<ConfigKeyInfo> configs = herder.connectorPluginConfig(pluginName); + assertNotNull(configs); - List<ConfigKeyInfo> predicateConfigs = herder.connectorPluginConfig("predicate"); - assertNotNull(predicateConfigs); - assertEquals(new SamplePredicate().config().names().size(), predicateConfigs.size()); + ConfigDef expectedConfig = pluginConfig.apply(newPluginInstance.get()); + assertEquals(expectedConfig.names().size(), configs.size()); - List<ConfigKeyInfo> transformationConfigs = herder.connectorPluginConfig("transformation"); - assertNotNull(transformationConfigs); - assertEquals(new SampleTransformation<>().config().names().size(), transformationConfigs.size()); + PowerMock.verifyAll(); } @Test(expected = NotFoundException.class) @@ -968,13 +965,16 @@ public class AbstractHerderTest { .addMockedMethod("generation") .createMock(); EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); - EasyMock.expect(plugins.newPlugin(anyString())).andThrow(new ClassNotFoundException()); + EasyMock.expect(plugins.pluginClass(connName)).andThrow(new ClassNotFoundException()); replayAll(); herder.connectorPluginConfig(connName); } @Test(expected = BadRequestException.class) + @SuppressWarnings({"rawtypes", "unchecked"}) public void testGetConnectorConfigDefWithInvalidPluginType() throws Exception { + PowerMock.mockStatic(Plugins.class); + String connName = "AnotherPlugin"; AbstractHerder herder = partialMockBuilder(AbstractHerder.class) .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class, @@ -983,6 +983,9 @@ public class AbstractHerderTest { .addMockedMethod("generation") .createMock(); EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); + EasyMock.expect(plugins.pluginClass(connName)).andReturn((Class) DirectoryConfigProvider.class); + EasyMock.expect(Plugins.compareAndSwapLoaders(DirectoryConfigProvider.class.getClassLoader())).andReturn(null); + EasyMock.expect(Plugins.compareAndSwapLoaders((ClassLoader) isNull())).andReturn(null); EasyMock.expect(plugins.newPlugin(anyString())).andReturn(new DirectoryConfigProvider()); replayAll(); herder.connectorPluginConfig(connName);
