This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d8b3ed87be8039700e951514c0a3cd283cfb081d
Author: Jorge Esteban Quilcate Otoya <[email protected]>
AuthorDate: Tue Mar 28 18:26:23 2023 +0300

    KAFKA-14843: Include Connect framework properties when retrieving connector 
config definitions (#13445)
    
    Reviewers: Yash Mayya <[email protected]>, Greg Harris 
<[email protected]>, Chris Egerton <[email protected]>
---
 .../kafka/connect/runtime/AbstractHerder.java      | 32 +++++++---
 .../kafka/connect/runtime/AbstractHerderTest.java  | 69 ++++++++++++++++++++--
 .../kafka/connect/runtime/SampleSinkConnector.java |  6 ++
 .../connect/runtime/SampleSourceConnector.java     | 33 ++++++++++-
 4 files changed, 125 insertions(+), 15 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index e5dd9790fb8..2eed83d0bf7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -41,6 +41,7 @@ import 
org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
+import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.Converter;
@@ -301,7 +302,7 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
         ConnectorStatus connector = statusBackingStore.get(connName);
         if (connector == null)
             throw new NotFoundException("No status found for connector " + 
connName);
-        
+
         Collection<TaskStatus> tasks = statusBackingStore.getAll(connName);
 
         ConnectorStateInfo.ConnectorState connectorState = new 
ConnectorStateInfo.ConnectorState(
@@ -771,29 +772,44 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
         try {
             Object plugin = p.newPlugin(pluginName);
             PluginType pluginType = PluginType.from(plugin.getClass());
-            ConfigDef configDefs;
+            // Contains definitions coming from Connect framework
+            ConfigDef baseConfigDefs = null;
+            // Contains definitions specifically declared on the plugin
+            ConfigDef pluginConfigDefs;
             switch (pluginType) {
                 case SINK:
+                    baseConfigDefs = SinkConnectorConfig.configDef();
+                    pluginConfigDefs = ((SinkConnector) plugin).config();
+                    break;
                 case SOURCE:
-                    configDefs = ((Connector) plugin).config();
+                    baseConfigDefs = SourceConnectorConfig.configDef();
+                    pluginConfigDefs = ((SourceConnector) plugin).config();
                     break;
                 case CONVERTER:
-                    configDefs = ((Converter) plugin).config();
+                    pluginConfigDefs = ((Converter) plugin).config();
                     break;
                 case HEADER_CONVERTER:
-                    configDefs = ((HeaderConverter) plugin).config();
+                    pluginConfigDefs = ((HeaderConverter) plugin).config();
                     break;
                 case TRANSFORMATION:
-                    configDefs = ((Transformation<?>) plugin).config();
+                    pluginConfigDefs = ((Transformation<?>) plugin).config();
                     break;
                 case PREDICATE:
-                    configDefs = ((Predicate<?>) plugin).config();
+                    pluginConfigDefs = ((Predicate<?>) plugin).config();
                     break;
                 default:
                     throw new BadRequestException("Invalid plugin type " + 
pluginType + ". Valid types are sink, source, converter, header_converter, 
transformation, predicate.");
             }
+
+            // Track config properties by name and, if the same property is 
defined in multiple places,
+            // give precedence to the one defined by the plugin class
+            // Preserve the ordering of properties as they're returned from 
each ConfigDef
+            Map<String, ConfigKey> configsMap = new 
LinkedHashMap<>(pluginConfigDefs.configKeys());
+            if (baseConfigDefs != null)
+                baseConfigDefs.configKeys().forEach(configsMap::putIfAbsent);
+
             List<ConfigKeyInfo> results = new ArrayList<>();
-            for (ConfigDef.ConfigKey configKey : 
configDefs.configKeys().values()) {
+            for (ConfigKey configKey : configsMap.values()) {
                 results.add(AbstractHerder.convertConfigKey(configKey));
             }
             return results;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 48fa6231de5..e2e61324147 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -907,17 +907,72 @@ public class AbstractHerderTest {
     }
 
     @Test
-    public void testPredicatePluginConfig() throws ClassNotFoundException {
-        testConnectorPluginConfig("predicate", SamplePredicate::new, 
SamplePredicate::config);
+    public void testSinkConnectorPluginConfig() throws ClassNotFoundException {
+        testConnectorPluginConfig(
+                "sink",
+                SampleSinkConnector::new,
+                SampleSinkConnector::config,
+                Optional.of(SinkConnectorConfig.configDef())
+        );
+    }
+
+    @Test
+    public void testSinkConnectorPluginConfigIncludingCommon() throws 
ClassNotFoundException {
+        testConnectorPluginConfig(
+                "sink",
+                SampleSinkConnector::new,
+                SampleSinkConnector::configWithCommon,
+                Optional.empty()
+        );
+    }
+
+    @Test
+    public void testSourceConnectorPluginConfig() throws 
ClassNotFoundException {
+        testConnectorPluginConfig(
+                "source",
+                SampleSourceConnector::new,
+                SampleSourceConnector::config,
+                Optional.of(SourceConnectorConfig.configDef())
+        );
     }
 
     @Test
-    public void testTransformationPluginConfig() throws ClassNotFoundException 
{
-        testConnectorPluginConfig("transformation", SampleTransformation::new, 
SampleTransformation::config);
+    public void testSourceConnectorPluginConfigIncludingCommon() throws 
ClassNotFoundException {
+        testConnectorPluginConfig(
+                "source",
+                SampleSourceConnector::new,
+                SampleSourceConnector::configWithCommon,
+                Optional.empty()
+        );
+    }
+
+    @Test
+    public void testConverterPluginConfig() throws ClassNotFoundException {
+        testConnectorPluginConfig(
+                "converter",
+                SampleConverterWithHeaders::new,
+                SampleConverterWithHeaders::config,
+                Optional.empty()
+        );
+    }
+
+    @Test
+    public void testHeaderConverterPluginConfig() throws 
ClassNotFoundException {
+        testConnectorPluginConfig(
+                "header-converter",
+                SampleHeaderConverter::new,
+                SampleHeaderConverter::config,
+                Optional.empty()
+        );
     }
 
     @SuppressWarnings("unchecked")
-    private <T> void testConnectorPluginConfig(String pluginName, Supplier<T> 
newPluginInstance, Function<T, ConfigDef> pluginConfig) throws 
ClassNotFoundException {
+    private <T> void testConnectorPluginConfig(
+            String pluginName,
+            Supplier<T> newPluginInstance,
+            Function<T, ConfigDef> pluginConfig,
+            Optional<ConfigDef> baseConfig
+    ) throws ClassNotFoundException {
         PowerMock.mockStatic(Plugins.class);
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
@@ -950,7 +1005,9 @@ public class AbstractHerderTest {
         assertNotNull(configs);
 
         ConfigDef expectedConfig = pluginConfig.apply(newPluginInstance.get());
-        assertEquals(expectedConfig.names().size(), configs.size());
+        int expectedConfigSize = baseConfig.map(config -> 
config.names().size()).orElse(0)
+                + expectedConfig.names().size();
+        assertEquals(expectedConfigSize, configs.size());
 
         PowerMock.verifyAll();
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSinkConnector.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSinkConnector.java
index 1c324fe6cee..da8fb231edc 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSinkConnector.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSinkConnector.java
@@ -58,4 +58,10 @@ public class SampleSinkConnector extends SinkConnector {
                 .define("required", ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, "required docs")
                 .define("optional", ConfigDef.Type.STRING, "defaultVal", 
ConfigDef.Importance.HIGH, "optional docs");
     }
+
+    public ConfigDef configWithCommon() {
+        return new ConfigDef(SinkConnectorConfig.configDef())
+                .define("required", ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, "required docs")
+                .define("optional", ConfigDef.Type.STRING, "defaultVal", 
ConfigDef.Importance.HIGH, "optional docs");
+    }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSourceConnector.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSourceConnector.java
index 36159540754..cb91530439f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSourceConnector.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSourceConnector.java
@@ -19,6 +19,8 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
 
 import java.util.List;
 import java.util.Map;
@@ -39,7 +41,7 @@ public class SampleSourceConnector extends SourceConnector {
 
     @Override
     public Class<? extends Task> taskClass() {
-        return null;
+        return SampleSourceTask.class;
     }
 
     @Override
@@ -58,4 +60,33 @@ public class SampleSourceConnector extends SourceConnector {
                 .define("required", ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, "required docs")
                 .define("optional", ConfigDef.Type.STRING, "defaultVal", 
ConfigDef.Importance.HIGH, "optional docs");
     }
+
+    public ConfigDef configWithCommon() {
+        return new ConfigDef(SourceConnectorConfig.configDef())
+                .define("required", ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, "required docs")
+                .define("optional", ConfigDef.Type.STRING, "defaultVal", 
ConfigDef.Importance.HIGH, "optional docs");
+    }
+
+    public static class SampleSourceTask extends SourceTask {
+
+        @Override
+        public String version() {
+            return VERSION;
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+
+        }
+
+        @Override
+        public List<SourceRecord> poll() throws InterruptedException {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+
+        }
+    }
 }

Reply via email to