[ 
https://issues.apache.org/jira/browse/KAFKA-6253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327939#comment-16327939
 ] 

ASF GitHub Bot commented on KAFKA-6253:
---------------------------------------

ewencp closed pull request #4251: KAFKA-6253: Improve sink connector topic 
regex validation
URL: https://github.com/apache/kafka/pull/4251
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index fbe0ae2afb2..6208e2f2cdb 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -248,9 +248,13 @@ public ConfigInfos validateConnectorConfig(Map<String, 
String> connectorProps) {
         Connector connector = getConnector(connType);
         ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
         try {
-            ConfigDef baseConfigDef = (connector instanceof SourceConnector)
-                    ? SourceConnectorConfig.configDef()
-                    : SinkConnectorConfig.configDef();
+            ConfigDef baseConfigDef;
+            if (connector instanceof SourceConnector) {
+                baseConfigDef = SourceConnectorConfig.configDef();
+            } else {
+                baseConfigDef = SinkConnectorConfig.configDef();
+                SinkConnectorConfig.validate(connectorProps);
+            }
             ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), 
baseConfigDef, connectorProps, false);
             Map<String, ConfigValue> validatedConnectorConfig = 
validateBasicConnectorConfig(
                     connector,
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index cf5564c25c2..887a4da2dea 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.transforms.util.RegexValidator;
@@ -34,7 +35,7 @@
     public static final String TOPICS_DEFAULT = "";
     private static final String TOPICS_DISPLAY = "Topics";
 
-    private static final String TOPICS_REGEX_CONFIG = 
SinkTask.TOPICS_REGEX_CONFIG;
+    public static final String TOPICS_REGEX_CONFIG = 
SinkTask.TOPICS_REGEX_CONFIG;
     private static final String TOPICS_REGEX_DOC = "Regular expression giving 
topics to consume. " +
         "Under the hood, the regex is compiled to a 
<code>java.util.regex.Pattern</code>. " +
         "Only one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG + " 
should be specified.";
@@ -52,4 +53,34 @@ public static ConfigDef configDef() {
     public SinkConnectorConfig(Plugins plugins, Map<String, String> props) {
         super(plugins, config, props);
     }
+
+    /**
+     * Throw an exception if the passed-in properties do not constitute a 
valid sink.
+     * @param props sink configuration properties
+     */
+    public static void validate(Map<String, String> props) {
+        final boolean hasTopicsConfig = hasTopicsConfig(props);
+        final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
+
+        if (hasTopicsConfig && hasTopicsRegexConfig) {
+            throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + 
SinkTask.TOPICS_REGEX_CONFIG +
+                " are mutually exclusive options, but both are set.");
+        }
+
+        if (!hasTopicsConfig && !hasTopicsRegexConfig) {
+            throw new ConfigException("Must configure one of " +
+                SinkTask.TOPICS_CONFIG + " or " + 
SinkTask.TOPICS_REGEX_CONFIG);
+        }
+    }
+
+    public static boolean hasTopicsConfig(Map<String, String> props) {
+        String topicsStr = props.get(TOPICS_CONFIG);
+        return topicsStr != null && !topicsStr.trim().isEmpty();
+    }
+
+    public static boolean hasTopicsRegexConfig(Map<String, String> props) {
+        String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG);
+        return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
+    }
+
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 9e65cd2d80f..29f487386ac 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -77,6 +77,9 @@ public void initialize(ConnectorConfig connectorConfig) {
         try {
             this.config = connectorConfig.originalsStrings();
             log.debug("{} Initializing connector {} with config {}", this, 
connName, config);
+            if (isSinkConnector()) {
+                SinkConnectorConfig.validate(config);
+            }
 
             connector.initialize(new ConnectorContext() {
                 @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 05ace587249..9c064325bef 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -25,7 +25,6 @@
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -259,27 +258,14 @@ public int commitFailures() {
      * Initializes and starts the SinkTask.
      */
     protected void initializeAndStart() {
-        String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
-        boolean topicsStrPresent = topicsStr != null && 
!topicsStr.trim().isEmpty();
+        SinkConnectorConfig.validate(taskConfig);
 
-        String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
-        boolean topicsRegexStrPresent = topicsRegexStr != null && 
!topicsRegexStr.trim().isEmpty();
-
-        if (topicsStrPresent && topicsRegexStrPresent) {
-            throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + 
SinkTask.TOPICS_REGEX_CONFIG +
-                " are mutually exclusive options, but both are set.");
-        }
-
-        if (!topicsStrPresent && !topicsRegexStrPresent) {
-            throw new ConfigException("Must configure one of " +
-                SinkTask.TOPICS_CONFIG + " or " + 
SinkTask.TOPICS_REGEX_CONFIG);
-        }
-
-        if (topicsStrPresent) {
-            String[] topics = topicsStr.split(",");
+        if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
+            String[] topics = 
taskConfig.get(SinkTask.TOPICS_CONFIG).split(",");
             consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
             log.debug("{} Initializing and starting task for topics {}", this, 
topics);
         } else {
+            String topicsRegexStr = 
taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
             Pattern pattern = Pattern.compile(topicsRegexStr);
             consumer.subscribe(pattern, new HandleRebalance());
             log.debug("{} Initializing and starting task for topics regex {}", 
this, topicsRegexStr);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index ac9c312823c..2400abf327a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
@@ -174,6 +175,21 @@ public void testConfigValidationMissingName() {
         verifyAll();
     }
 
+    @Test(expected = ConfigException.class)
+    public void testConfigValidationInvalidTopics() {
+        AbstractHerder herder = 
createConfigValidationHerder(TestSinkConnector.class);
+        replayAll();
+
+        Map<String, String> config = new HashMap();
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
TestSinkConnector.class.getName());
+        config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1,topic2");
+        config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
+
+        herder.validateConnectorConfig(config);
+
+        verifyAll();
+    }
+
     @Test()
     public void testConfigValidationTransformsExtendResults() {
         AbstractHerder herder = 
createConfigValidationHerder(TestSourceConnector.class);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index d41ccbebae0..d1e02edf0b5 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -352,7 +352,7 @@ public void testCreateConnector() throws Exception {
         PowerMock.expectLastCall();
 
         // config validation
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -395,7 +395,7 @@ public void testCreateConnectorFailedBasicValidation() 
throws Exception {
         PowerMock.expectLastCall();
 
         // config validation
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -440,7 +440,7 @@ public void testCreateConnectorFailedCustomValidation() 
throws Exception {
         PowerMock.expectLastCall();
 
         // config validation
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -1335,7 +1335,7 @@ public void testPutConnectorConfig() throws Exception {
         PowerMock.expectLastCall();
 
         // config validation
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 18d2739f13e..c29005a0a04 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -124,7 +124,8 @@ public void testCreateSourceConnector() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
 
         PowerMock.replayAll();
 
@@ -141,7 +142,7 @@ public void testCreateConnectorFailedBasicValidation() 
throws Exception {
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         config.remove(ConnectorConfig.NAME_CONFIG);
 
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -166,7 +167,7 @@ public void testCreateConnectorFailedBasicValidation() 
throws Exception {
     public void testCreateConnectorFailedCustomValidation() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
 
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -198,7 +199,7 @@ public void testCreateConnectorAlreadyExists() throws 
Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config, config);
 
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2);
@@ -223,7 +224,8 @@ public void testCreateSinkConnector() throws Exception {
         expectAdd(SourceSink.SINK);
 
         Map<String, String> config = connectorConfig(SourceSink.SINK);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        expectConfigValidation(connectorMock, true, config);
         PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
@@ -237,7 +239,8 @@ public void testDestroyConnector() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
 
         
EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
         statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, 
AbstractStatus.State.DESTROYED, WORKER_ID, 0));
@@ -269,7 +272,8 @@ public void testRestartConnector() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
 
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(true);
@@ -294,7 +298,8 @@ public void testRestartConnectorFailureOnStart() throws 
Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
 
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(true);
@@ -325,7 +330,8 @@ public void testRestartTask() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(connectorConfig);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, connectorConfig);
 
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall();
@@ -350,7 +356,8 @@ public void testRestartTaskFailureOnStart() throws 
Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(connectorConfig);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, connectorConfig);
 
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall();
@@ -380,7 +387,8 @@ public void testCreateAndStop() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(connectorConfig);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, connectorConfig);
 
         // herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
         expectStop();
@@ -401,6 +409,7 @@ public void testCreateAndStop() throws Exception {
     @Test
     public void testAccessors() throws Exception {
         Map<String, String> connConfig = connectorConfig(SourceSink.SOURCE);
+        System.out.println(connConfig);
 
         Callback<Collection<String>> listConnectorsCb = 
PowerMock.createMock(Callback.class);
         Callback<ConnectorInfo> connectorInfoCb = 
PowerMock.createMock(Callback.class);
@@ -420,7 +429,8 @@ public void testAccessors() throws Exception {
         // Create connector
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
-        expectConfigValidation(connConfig);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connector, true, connConfig);
 
         // Validate accessors with 1 connector
         listConnectorsCb.onCompletion(null, singleton(CONNECTOR_NAME));
@@ -466,7 +476,7 @@ public void testPutConnectorConfig() throws Exception {
         // Create
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, connConfig);
 
         // Should get first config
@@ -525,7 +535,8 @@ public void testCorruptConfig() {
         Map<String, String> config = new HashMap<>();
         config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
         config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
BogusSinkConnector.class.getName());
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        config.put(SinkConnectorConfig.TOPICS_CONFIG, TOPICS_LIST_STR);
+        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
         String error = "This is an error in your config!";
         List<String> errors = new ArrayList<>(singletonList(error));
         String key = "foo.invalid.key";
@@ -591,7 +602,7 @@ private void expectAdd(SourceSink sourceSink) throws 
Exception {
         
EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName()))
             .andReturn(ConnectorType.SOURCE).anyTimes();
         
EasyMock.expect(herder.connectorTypeForClass(BogusSinkConnector.class.getName()))
-        .andReturn(ConnectorType.SINK).anyTimes();
+            .andReturn(ConnectorType.SINK).anyTimes();
         worker.isSinkConnector(CONNECTOR_NAME);
         PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK);
     }
@@ -630,12 +641,6 @@ private void expectDestroy() {
         return generatedTaskProps;
     }
 
-
-    private void expectConfigValidation(Map<String, String> ... configs) {
-        Connector connectorMock = PowerMock.createMock(Connector.class);
-        expectConfigValidation(connectorMock, true, configs);
-    }
-
     private void expectConfigValidation(
             Connector connectorMock,
             boolean shouldCreateConnector,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Improve sink connector topic regex validation
> ---------------------------------------------
>
>                 Key: KAFKA-6253
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6253
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Jeff Klukas
>            Priority: Major
>             Fix For: 1.1.0
>
>
> KAFKA-3073 adds topic regex support for sink connectors. The addition 
> requires that you only specify one of topics or topics.regex settings. This 
> is being validated in one place, but not during submission of connectors. We 
> should improve this since this means it's possible to get a bad connector 
> config into the config topic.
> For more detailed discussion, see 
> https://github.com/apache/kafka/pull/4151#pullrequestreview-77300221



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to