This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit d8b3ed87be8039700e951514c0a3cd283cfb081d Author: Jorge Esteban Quilcate Otoya <[email protected]> AuthorDate: Tue Mar 28 18:26:23 2023 +0300 KAFKA-14843: Include Connect framework properties when retrieving connector config definitions (#13445) Reviewers: Yash Mayya <[email protected]>, Greg Harris <[email protected]>, Chris Egerton <[email protected]> --- .../kafka/connect/runtime/AbstractHerder.java | 32 +++++++--- .../kafka/connect/runtime/AbstractHerderTest.java | 69 ++++++++++++++++++++-- .../kafka/connect/runtime/SampleSinkConnector.java | 6 ++ .../connect/runtime/SampleSourceConnector.java | 33 ++++++++++- 4 files changed, 125 insertions(+), 15 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index e5dd9790fb8..2eed83d0bf7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -41,6 +41,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; +import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.Converter; @@ -301,7 +302,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con ConnectorStatus connector = statusBackingStore.get(connName); if (connector == null) throw new NotFoundException("No status found for connector " + connName); - + Collection<TaskStatus> tasks = statusBackingStore.getAll(connName); ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState( @@ -771,29 +772,44 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con try { Object plugin = p.newPlugin(pluginName); PluginType pluginType = PluginType.from(plugin.getClass()); - ConfigDef configDefs; + // Contains definitions coming from Connect framework + ConfigDef baseConfigDefs = null; + // Contains definitions specifically declared on the plugin + ConfigDef pluginConfigDefs; switch (pluginType) { case SINK: + baseConfigDefs = SinkConnectorConfig.configDef(); + pluginConfigDefs = ((SinkConnector) plugin).config(); + break; case SOURCE: - configDefs = ((Connector) plugin).config(); + baseConfigDefs = SourceConnectorConfig.configDef(); + pluginConfigDefs = ((SourceConnector) plugin).config(); break; case CONVERTER: - configDefs = ((Converter) plugin).config(); + pluginConfigDefs = ((Converter) plugin).config(); break; case HEADER_CONVERTER: - configDefs = ((HeaderConverter) plugin).config(); + pluginConfigDefs = ((HeaderConverter) plugin).config(); break; case TRANSFORMATION: - configDefs = ((Transformation<?>) plugin).config(); + pluginConfigDefs = ((Transformation<?>) plugin).config(); break; case PREDICATE: - configDefs = ((Predicate<?>) plugin).config(); + pluginConfigDefs = ((Predicate<?>) plugin).config(); break; default: throw new BadRequestException("Invalid plugin type " + pluginType + ". Valid types are sink, source, converter, header_converter, transformation, predicate."); } + + // Track config properties by name and, if the same property is defined in multiple places, + // give precedence to the one defined by the plugin class + // Preserve the ordering of properties as they're returned from each ConfigDef + Map<String, ConfigKey> configsMap = new LinkedHashMap<>(pluginConfigDefs.configKeys()); + if (baseConfigDefs != null) + baseConfigDefs.configKeys().forEach(configsMap::putIfAbsent); + List<ConfigKeyInfo> results = new ArrayList<>(); - for (ConfigDef.ConfigKey configKey : configDefs.configKeys().values()) { + for (ConfigKey configKey : configsMap.values()) { results.add(AbstractHerder.convertConfigKey(configKey)); } return results; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 48fa6231de5..e2e61324147 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -907,17 +907,72 @@ public class AbstractHerderTest { } @Test - public void testPredicatePluginConfig() throws ClassNotFoundException { - testConnectorPluginConfig("predicate", SamplePredicate::new, SamplePredicate::config); + public void testSinkConnectorPluginConfig() throws ClassNotFoundException { + testConnectorPluginConfig( + "sink", + SampleSinkConnector::new, + SampleSinkConnector::config, + Optional.of(SinkConnectorConfig.configDef()) + ); + } + + @Test + public void testSinkConnectorPluginConfigIncludingCommon() throws ClassNotFoundException { + testConnectorPluginConfig( + "sink", + SampleSinkConnector::new, + SampleSinkConnector::configWithCommon, + Optional.empty() + ); + } + + @Test + public void testSourceConnectorPluginConfig() throws ClassNotFoundException { + testConnectorPluginConfig( + "source", + SampleSourceConnector::new, + SampleSourceConnector::config, + Optional.of(SourceConnectorConfig.configDef()) + ); } @Test - public void testTransformationPluginConfig() throws ClassNotFoundException { - testConnectorPluginConfig("transformation", SampleTransformation::new, SampleTransformation::config); + public void testSourceConnectorPluginConfigIncludingCommon() throws ClassNotFoundException { + testConnectorPluginConfig( + "source", + SampleSourceConnector::new, + SampleSourceConnector::configWithCommon, + Optional.empty() + ); + } + + @Test + public void testConverterPluginConfig() throws ClassNotFoundException { + testConnectorPluginConfig( + "converter", + SampleConverterWithHeaders::new, + SampleConverterWithHeaders::config, + Optional.empty() + ); + } + + @Test + public void testHeaderConverterPluginConfig() throws ClassNotFoundException { + testConnectorPluginConfig( + "header-converter", + SampleHeaderConverter::new, + SampleHeaderConverter::config, + Optional.empty() + ); } @SuppressWarnings("unchecked") - private <T> void testConnectorPluginConfig(String pluginName, Supplier<T> newPluginInstance, Function<T, ConfigDef> pluginConfig) throws ClassNotFoundException { + private <T> void testConnectorPluginConfig( + String pluginName, + Supplier<T> newPluginInstance, + Function<T, ConfigDef> pluginConfig, + Optional<ConfigDef> baseConfig + ) throws ClassNotFoundException { PowerMock.mockStatic(Plugins.class); AbstractHerder herder = partialMockBuilder(AbstractHerder.class) @@ -950,7 +1005,9 @@ public class AbstractHerderTest { assertNotNull(configs); ConfigDef expectedConfig = pluginConfig.apply(newPluginInstance.get()); - assertEquals(expectedConfig.names().size(), configs.size()); + int expectedConfigSize = baseConfig.map(config -> config.names().size()).orElse(0) + + expectedConfig.names().size(); + assertEquals(expectedConfigSize, configs.size()); PowerMock.verifyAll(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSinkConnector.java index 1c324fe6cee..da8fb231edc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSinkConnector.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSinkConnector.java @@ -58,4 +58,10 @@ public class SampleSinkConnector extends SinkConnector { .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs"); } + + public ConfigDef configWithCommon() { + return new ConfigDef(SinkConnectorConfig.configDef()) + .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") + .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs"); + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSourceConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSourceConnector.java index 36159540754..cb91530439f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSourceConnector.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSourceConnector.java @@ -19,6 +19,8 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; import java.util.List; import java.util.Map; @@ -39,7 +41,7 @@ public class SampleSourceConnector extends SourceConnector { @Override public Class<? extends Task> taskClass() { - return null; + return SampleSourceTask.class; } @Override @@ -58,4 +60,33 @@ public class SampleSourceConnector extends SourceConnector { .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs"); } + + public ConfigDef configWithCommon() { + return new ConfigDef(SourceConnectorConfig.configDef()) + .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") + .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs"); + } + + public static class SampleSourceTask extends SourceTask { + + @Override + public String version() { + return VERSION; + } + + @Override + public void start(Map<String, String> props) { + + } + + @Override + public List<SourceRecord> poll() throws InterruptedException { + return null; + } + + @Override + public void stop() { + + } + } }
