This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 507afe40ad31daa429d01e655317c6e70cd4fec5
Author: Chris Egerton <[email protected]>
AuthorDate: Mon Jan 30 12:06:02 2023 -0500

    KAFKA-14645: Use plugin classloader when retrieving connector plugin config 
definitions (#13148)
    
    Reviewers: Mickael Maison <[email protected]>, Greg Harris 
<[email protected]>
---
 .../kafka/connect/runtime/AbstractHerder.java      | 30 ++++++---
 .../kafka/connect/runtime/isolation/Plugins.java   |  4 ++
 .../kafka/connect/runtime/AbstractHerderTest.java  | 73 +++++++++++-----------
 3 files changed, 63 insertions(+), 44 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 2fe75a955b0..e5dd9790fb8 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.Connector;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.PluginType;
@@ -757,12 +758,20 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
 
     @Override
     public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
-        List<ConfigKeyInfo> results = new ArrayList<>();
-        ConfigDef configDefs;
+        Plugins p = plugins();
+        Class<?> pluginClass;
+        try {
+            pluginClass = p.pluginClass(pluginName);
+        } catch (ClassNotFoundException cnfe) {
+            throw new NotFoundException("Unknown plugin " + pluginName + ".");
+        }
+
+        ClassLoader pluginLoader = pluginClass.getClassLoader();
+        ClassLoader savedLoader = Plugins.compareAndSwapLoaders(pluginLoader);
         try {
-            Plugins p = plugins();
             Object plugin = p.newPlugin(pluginName);
             PluginType pluginType = PluginType.from(plugin.getClass());
+            ConfigDef configDefs;
             switch (pluginType) {
                 case SINK:
                 case SOURCE:
@@ -783,13 +792,16 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
                 default:
                     throw new BadRequestException("Invalid plugin type " + 
pluginType + ". Valid types are sink, source, converter, header_converter, 
transformation, predicate.");
             }
-        } catch (ClassNotFoundException cnfe) {
-            throw new NotFoundException("Unknown plugin " + pluginName + ".");
-        }
-        for (ConfigDef.ConfigKey configKey : configDefs.configKeys().values()) 
{
-            results.add(AbstractHerder.convertConfigKey(configKey));
+            List<ConfigKeyInfo> results = new ArrayList<>();
+            for (ConfigDef.ConfigKey configKey : 
configDefs.configKeys().values()) {
+                results.add(AbstractHerder.convertConfigKey(configKey));
+            }
+            return results;
+        } catch (ClassNotFoundException e) {
+            throw new ConnectException("Failed to load plugin class or one of 
its dependencies", e);
+        } finally {
+            Plugins.compareAndSwapLoaders(savedLoader);
         }
-        return results;
     }
 
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 7ec73ba78b8..8960874d4c4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -120,6 +120,10 @@ public class Plugins {
         );
     }
 
+    public Class<?> pluginClass(String classOrAlias) throws 
ClassNotFoundException {
+        return pluginClass(delegatingLoader, classOrAlias, Object.class);
+    }
+
     public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
         ClassLoader current = Thread.currentThread().getContextClassLoader();
         if (!current.equals(loader)) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 5b9e199e5a1..48fa6231de5 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -68,10 +68,12 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.connect.runtime.AbstractHerder.keysWithVariableValues;
 import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.isNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.powermock.api.easymock.PowerMock.verifyAll;
@@ -84,7 +86,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({AbstractHerder.class})
+@PrepareForTest({AbstractHerder.class, Plugins.class})
 public class AbstractHerderTest {
 
     private static final String CONN1 = "sourceA";
@@ -905,7 +907,19 @@ public class AbstractHerderTest {
     }
 
     @Test
-    public void testConnectorPluginConfig() throws Exception {
+    public void testPredicatePluginConfig() throws ClassNotFoundException {
+        testConnectorPluginConfig("predicate", SamplePredicate::new, 
SamplePredicate::config);
+    }
+
+    @Test
+    public void testTransformationPluginConfig() throws ClassNotFoundException 
{
+        testConnectorPluginConfig("transformation", SampleTransformation::new, 
SampleTransformation::config);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> void testConnectorPluginConfig(String pluginName, Supplier<T> 
newPluginInstance, Function<T, ConfigDef> pluginConfig) throws 
ClassNotFoundException {
+        PowerMock.mockStatic(Plugins.class);
+
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
                 .withConstructor(
                         Worker.class,
@@ -919,43 +933,26 @@ public class AbstractHerderTest {
                 .addMockedMethod("generation")
                 .createMock();
 
-        EasyMock.expect(plugins.newPlugin(EasyMock.anyString())).andAnswer(() 
-> {
-            String name = (String) EasyMock.getCurrentArguments()[0];
-            switch (name) {
-                case "sink": return new SampleSinkConnector();
-                case "source": return new SampleSourceConnector();
-                case "converter": return new SampleConverterWithHeaders();
-                case "header-converter": return new SampleHeaderConverter();
-                case "predicate": return new SamplePredicate();
-                default: return new SampleTransformation<>();
-            }
-        }).anyTimes();
-        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
-        replayAll();
-
-        List<ConfigKeyInfo> sinkConnectorConfigs = 
herder.connectorPluginConfig("sink");
-        assertNotNull(sinkConnectorConfigs);
-        assertEquals(new SampleSinkConnector().config().names().size(), 
sinkConnectorConfigs.size());
+        @SuppressWarnings("rawtypes")
+        Class pluginClass = newPluginInstance.get().getClass();
 
-        List<ConfigKeyInfo> sourceConnectorConfigs = 
herder.connectorPluginConfig("source");
-        assertNotNull(sourceConnectorConfigs);
-        assertEquals(new SampleSourceConnector().config().names().size(), 
sourceConnectorConfigs.size());
+        EasyMock.expect(plugins.pluginClass(pluginName)).andAnswer(() -> 
pluginClass);
+        // Make sure that we used the correct class loader when interacting 
with the plugin
+        // The result of this method is used exclusively in try-with-resources 
blocks, which allow for null values
+        
EasyMock.expect(Plugins.compareAndSwapLoaders(pluginClass.getClassLoader())).andReturn(null);
+        EasyMock.expect(Plugins.compareAndSwapLoaders((ClassLoader) 
isNull())).andReturn(null);
+        
EasyMock.expect(plugins.newPlugin(anyString())).andAnswer(newPluginInstance::get);
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
 
-        List<ConfigKeyInfo> converterConfigs = 
herder.connectorPluginConfig("converter");
-        assertNotNull(converterConfigs);
-        assertEquals(new SampleConverterWithHeaders().config().names().size(), 
converterConfigs.size());
+        replayAll();
 
-        List<ConfigKeyInfo> headerConverterConfigs = 
herder.connectorPluginConfig("header-converter");
-        assertNotNull(headerConverterConfigs);
-        assertEquals(new SampleHeaderConverter().config().names().size(), 
headerConverterConfigs.size());
+        List<ConfigKeyInfo> configs = herder.connectorPluginConfig(pluginName);
+        assertNotNull(configs);
 
-        List<ConfigKeyInfo> predicateConfigs = 
herder.connectorPluginConfig("predicate");
-        assertNotNull(predicateConfigs);
-        assertEquals(new SamplePredicate().config().names().size(), 
predicateConfigs.size());
+        ConfigDef expectedConfig = pluginConfig.apply(newPluginInstance.get());
+        assertEquals(expectedConfig.names().size(), configs.size());
 
-        List<ConfigKeyInfo> transformationConfigs = 
herder.connectorPluginConfig("transformation");
-        assertNotNull(transformationConfigs);
-        assertEquals(new SampleTransformation<>().config().names().size(), 
transformationConfigs.size());
+        PowerMock.verifyAll();
     }
 
     @Test(expected = NotFoundException.class)
@@ -968,13 +965,16 @@ public class AbstractHerderTest {
                 .addMockedMethod("generation")
                 .createMock();
         EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
-        EasyMock.expect(plugins.newPlugin(anyString())).andThrow(new 
ClassNotFoundException());
+        EasyMock.expect(plugins.pluginClass(connName)).andThrow(new 
ClassNotFoundException());
         replayAll();
         herder.connectorPluginConfig(connName);
     }
 
     @Test(expected = BadRequestException.class)
+    @SuppressWarnings({"rawtypes", "unchecked"})
     public void testGetConnectorConfigDefWithInvalidPluginType() throws 
Exception {
+        PowerMock.mockStatic(Plugins.class);
+
         String connName = "AnotherPlugin";
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
                 .withConstructor(Worker.class, String.class, String.class, 
StatusBackingStore.class, ConfigBackingStore.class,
@@ -983,6 +983,9 @@ public class AbstractHerderTest {
                 .addMockedMethod("generation")
                 .createMock();
         EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.pluginClass(connName)).andReturn((Class) 
DirectoryConfigProvider.class);
+        
EasyMock.expect(Plugins.compareAndSwapLoaders(DirectoryConfigProvider.class.getClassLoader())).andReturn(null);
+        EasyMock.expect(Plugins.compareAndSwapLoaders((ClassLoader) 
isNull())).andReturn(null);
         EasyMock.expect(plugins.newPlugin(anyString())).andReturn(new 
DirectoryConfigProvider());
         replayAll();
         herder.connectorPluginConfig(connName);

Reply via email to