This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 17559d581e9 KAFKA-14645: Use plugin classloader when retrieving 
connector plugin config definitions (#13148)
17559d581e9 is described below

commit 17559d581e9ba89eb3c96b32f232cfa03d8b45a5
Author: Chris Egerton <[email protected]>
AuthorDate: Mon Jan 30 12:06:02 2023 -0500

    KAFKA-14645: Use plugin classloader when retrieving connector plugin config 
definitions (#13148)
    
    
    Reviewers: Mickael Maison <[email protected]>, Greg Harris 
<[email protected]>
---
 .../kafka/connect/runtime/AbstractHerder.java      | 25 ++++---
 .../kafka/connect/runtime/isolation/Plugins.java   |  4 ++
 .../kafka/connect/runtime/AbstractHerderTest.java  | 76 ++++++++++++----------
 3 files changed, 61 insertions(+), 44 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 15fb23d35a4..32ef4f88f91 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -797,12 +797,18 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
 
     @Override
     public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
-        List<ConfigKeyInfo> results = new ArrayList<>();
-        ConfigDef configDefs;
+        Plugins p = plugins();
+        Class<?> pluginClass;
         try {
-            Plugins p = plugins();
+            pluginClass = p.pluginClass(pluginName);
+        } catch (ClassNotFoundException cnfe) {
+            throw new NotFoundException("Unknown plugin " + pluginName + ".");
+        }
+
+        try (LoaderSwap loaderSwap = 
p.withClassLoader(pluginClass.getClassLoader())) {
             Object plugin = p.newPlugin(pluginName);
             PluginType pluginType = PluginType.from(plugin.getClass());
+            ConfigDef configDefs;
             switch (pluginType) {
                 case SINK:
                 case SOURCE:
@@ -823,13 +829,14 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
                 default:
                     throw new BadRequestException("Invalid plugin type " + 
pluginType + ". Valid types are sink, source, converter, header_converter, 
transformation, predicate.");
             }
-        } catch (ClassNotFoundException cnfe) {
-            throw new NotFoundException("Unknown plugin " + pluginName + ".");
-        }
-        for (ConfigDef.ConfigKey configKey : configDefs.configKeys().values()) 
{
-            results.add(AbstractHerder.convertConfigKey(configKey));
+            List<ConfigKeyInfo> results = new ArrayList<>();
+            for (ConfigDef.ConfigKey configKey : 
configDefs.configKeys().values()) {
+                results.add(AbstractHerder.convertConfigKey(configKey));
+            }
+            return results;
+        } catch (ClassNotFoundException e) {
+            throw new ConnectException("Failed to load plugin class or one of 
its dependencies", e);
         }
-        return results;
     }
 
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 311c14be929..a741c9cddb6 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -123,6 +123,10 @@ public class Plugins {
         );
     }
 
+    public Class<?> pluginClass(String classOrAlias) throws 
ClassNotFoundException {
+        return pluginClass(delegatingLoader, classOrAlias, Object.class);
+    }
+
     public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
         ClassLoader current = Thread.currentThread().getContextClassLoader();
         if (!current.equals(loader)) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 2e5a82f4f98..ec2f38a2799 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -67,6 +67,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.connect.runtime.AbstractHerder.keysWithVariableValues;
@@ -899,48 +900,51 @@ public class AbstractHerderTest {
     }
 
     @Test
-    public void testConnectorPluginConfig() throws Exception {
+    public void testSinkConnectorPluginConfig() throws ClassNotFoundException {
+        testConnectorPluginConfig("sink", SampleSinkConnector::new, 
SampleSinkConnector::config);
+    }
 
-        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
-                .useConstructor(worker, workerId, kafkaClusterId, statusStore, 
configStore, noneConnectorClientConfigOverridePolicy)
-                .defaultAnswer(CALLS_REAL_METHODS));
+    @Test
+    public void testSourceConnectorPluginConfig() throws 
ClassNotFoundException {
+        testConnectorPluginConfig("source", SampleSourceConnector::new, 
SampleSourceConnector::config);
+    }
 
-        when(plugins.newPlugin(anyString())).then(invocation -> {
-            String name = invocation.getArgument(0);
-            switch (name) {
-                case "sink": return new SampleSinkConnector();
-                case "source": return new SampleSourceConnector();
-                case "converter": return new SampleConverterWithHeaders();
-                case "header-converter": return new SampleHeaderConverter();
-                case "predicate": return new SamplePredicate();
-                default: return new SampleTransformation<>();
-            }
-        });
-        when(herder.plugins()).thenReturn(plugins);
+    @Test
+    public void testConverterPluginConfig() throws ClassNotFoundException {
+        testConnectorPluginConfig("converter", 
SampleConverterWithHeaders::new, SampleConverterWithHeaders::config);
+    }
+
+    @Test
+    public void testHeaderConverterPluginConfig() throws 
ClassNotFoundException {
+        testConnectorPluginConfig("header-converter", 
SampleHeaderConverter::new, SampleHeaderConverter::config);
+    }
 
-        List<ConfigKeyInfo> sinkConnectorConfigs = 
herder.connectorPluginConfig("sink");
-        assertNotNull(sinkConnectorConfigs);
-        assertEquals(new SampleSinkConnector().config().names().size(), 
sinkConnectorConfigs.size());
+    @Test
+    public void testPredicatePluginConfig() throws ClassNotFoundException {
+        testConnectorPluginConfig("predicate", SamplePredicate::new, 
SamplePredicate::config);
+    }
 
-        List<ConfigKeyInfo> sourceConnectorConfigs = 
herder.connectorPluginConfig("source");
-        assertNotNull(sourceConnectorConfigs);
-        assertEquals(new SampleSourceConnector().config().names().size(), 
sourceConnectorConfigs.size());
+    @Test
+    public void testTransformationPluginConfig() throws ClassNotFoundException 
{
+        testConnectorPluginConfig("transformation", SampleTransformation::new, 
SampleTransformation::config);
+    }
 
-        List<ConfigKeyInfo> converterConfigs = 
herder.connectorPluginConfig("converter");
-        assertNotNull(converterConfigs);
-        assertEquals(new SampleConverterWithHeaders().config().names().size(), 
converterConfigs.size());
+    private <T> void testConnectorPluginConfig(String pluginName, Supplier<T> 
newPluginInstance, Function<T, ConfigDef> pluginConfig) throws 
ClassNotFoundException {
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, 
configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
 
-        List<ConfigKeyInfo> headerConverterConfigs = 
herder.connectorPluginConfig("header-converter");
-        assertNotNull(headerConverterConfigs);
-        assertEquals(new SampleHeaderConverter().config().names().size(), 
headerConverterConfigs.size());
+        when(plugins.pluginClass(pluginName)).then(invocation -> 
newPluginInstance.get().getClass());
+        when(plugins.newPlugin(anyString())).then(invocation -> 
newPluginInstance.get());
+        when(herder.plugins()).thenReturn(plugins);
 
-        List<ConfigKeyInfo> predicateConfigs = 
herder.connectorPluginConfig("predicate");
-        assertNotNull(predicateConfigs);
-        assertEquals(new SamplePredicate().config().names().size(), 
predicateConfigs.size());
+        List<ConfigKeyInfo> configs = herder.connectorPluginConfig(pluginName);
+        assertNotNull(configs);
 
-        List<ConfigKeyInfo> transformationConfigs = 
herder.connectorPluginConfig("transformation");
-        assertNotNull(transformationConfigs);
-        assertEquals(new SampleTransformation<>().config().names().size(), 
transformationConfigs.size());
+        ConfigDef expectedConfig = pluginConfig.apply(newPluginInstance.get());
+        assertEquals(expectedConfig.names().size(), configs.size());
+        // Make sure that we used the correct class loader when interacting 
with the plugin
+        
verify(plugins).withClassLoader(newPluginInstance.get().getClass().getClassLoader());
     }
 
     @Test(expected = NotFoundException.class)
@@ -950,17 +954,19 @@ public class AbstractHerderTest {
                 .useConstructor(worker, workerId, kafkaClusterId, statusStore, 
configStore, noneConnectorClientConfigOverridePolicy)
                 .defaultAnswer(CALLS_REAL_METHODS));
         when(worker.getPlugins()).thenReturn(plugins);
-        when(plugins.newPlugin(anyString())).thenThrow(new 
ClassNotFoundException());
+        when(plugins.pluginClass(anyString())).thenThrow(new 
ClassNotFoundException());
         herder.connectorPluginConfig(connName);
     }
 
     @Test(expected = BadRequestException.class)
+    @SuppressWarnings({"rawtypes", "unchecked"})
     public void testGetConnectorConfigDefWithInvalidPluginType() throws 
Exception {
         String connName = "AnotherPlugin";
         AbstractHerder herder = mock(AbstractHerder.class, withSettings()
                 .useConstructor(worker, workerId, kafkaClusterId, statusStore, 
configStore, noneConnectorClientConfigOverridePolicy)
                 .defaultAnswer(CALLS_REAL_METHODS));
         when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.pluginClass(anyString())).thenReturn((Class) 
Object.class);
         when(plugins.newPlugin(anyString())).thenReturn(new 
DirectoryConfigProvider());
         herder.connectorPluginConfig(connName);
     }

Reply via email to