This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new eb3fef7 KAFKA-6253: Improve sink connector topic regex validation
eb3fef7 is described below
commit eb3fef760e1c876b936f175e0eb9a1446cf5bdcf
Author: Jeff Klukas <[email protected]>
AuthorDate: Mon Feb 5 09:46:07 2018 -0800
KAFKA-6253: Improve sink connector topic regex validation
KAFKA-3073 added topic regex support for sink connectors. The addition
requires that you only specify one of topics or topics.regex settings. This is
being validated in one place, but not during submission of connectors. This PR
adds validation at `AbstractHerder.validateConnectorConfig` and
`WorkerConnector.initialize`.
This adds a test of the new behavior to `AbstractHerderTest`.
Author: Jeff Klukas <[email protected]>
Reviewers: Randall Hauch <[email protected]>, Ewen Cheslack-Postava
<[email protected]>
Closes #4251 from jklukas/connect-topics-validation
---
.../kafka/connect/runtime/AbstractHerder.java | 10 +++--
.../kafka/connect/runtime/SinkConnectorConfig.java | 33 ++++++++++++++-
.../kafka/connect/runtime/WorkerConnector.java | 3 ++
.../kafka/connect/runtime/WorkerSinkTask.java | 22 ++--------
.../kafka/connect/runtime/AbstractHerderTest.java | 16 ++++++++
.../runtime/distributed/DistributedHerderTest.java | 8 ++--
.../runtime/standalone/StandaloneHerderTest.java | 47 ++++++++++++----------
7 files changed, 92 insertions(+), 47 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 02465c9..b913f9e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -256,9 +256,13 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
Connector connector = getConnector(connType);
ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
try {
- ConfigDef baseConfigDef = (connector instanceof SourceConnector)
- ? SourceConnectorConfig.configDef()
- : SinkConnectorConfig.configDef();
+ ConfigDef baseConfigDef;
+ if (connector instanceof SourceConnector) {
+ baseConfigDef = SourceConnectorConfig.configDef();
+ } else {
+ baseConfigDef = SinkConnectorConfig.configDef();
+ SinkConnectorConfig.validate(connectorProps);
+ }
ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(),
baseConfigDef, connectorProps, false);
Map<String, ConfigValue> validatedConnectorConfig =
validateBasicConnectorConfig(
connector,
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index cf5564c..887a4da 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.transforms.util.RegexValidator;
@@ -34,7 +35,7 @@ public class SinkConnectorConfig extends ConnectorConfig {
public static final String TOPICS_DEFAULT = "";
private static final String TOPICS_DISPLAY = "Topics";
- private static final String TOPICS_REGEX_CONFIG =
SinkTask.TOPICS_REGEX_CONFIG;
+ public static final String TOPICS_REGEX_CONFIG =
SinkTask.TOPICS_REGEX_CONFIG;
private static final String TOPICS_REGEX_DOC = "Regular expression giving
topics to consume. " +
"Under the hood, the regex is compiled to a
<code>java.util.regex.Pattern</code>. " +
"Only one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG + "
should be specified.";
@@ -52,4 +53,34 @@ public class SinkConnectorConfig extends ConnectorConfig {
public SinkConnectorConfig(Plugins plugins, Map<String, String> props) {
super(plugins, config, props);
}
+
+ /**
+ * Throw an exception if the passed-in properties do not constitute a
valid sink.
+ * @param props sink configuration properties
+ */
+ public static void validate(Map<String, String> props) {
+ final boolean hasTopicsConfig = hasTopicsConfig(props);
+ final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
+
+ if (hasTopicsConfig && hasTopicsRegexConfig) {
+ throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " +
SinkTask.TOPICS_REGEX_CONFIG +
+ " are mutually exclusive options, but both are set.");
+ }
+
+ if (!hasTopicsConfig && !hasTopicsRegexConfig) {
+ throw new ConfigException("Must configure one of " +
+ SinkTask.TOPICS_CONFIG + " or " +
SinkTask.TOPICS_REGEX_CONFIG);
+ }
+ }
+
+ public static boolean hasTopicsConfig(Map<String, String> props) {
+ String topicsStr = props.get(TOPICS_CONFIG);
+ return topicsStr != null && !topicsStr.trim().isEmpty();
+ }
+
+ public static boolean hasTopicsRegexConfig(Map<String, String> props) {
+ String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG);
+ return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
+ }
+
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 9b934f3..611e196 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -77,6 +77,9 @@ public class WorkerConnector {
try {
this.config = connectorConfig.originalsStrings();
log.debug("{} Initializing connector {} with config {}", this,
connName, config);
+ if (isSinkConnector()) {
+ SinkConnectorConfig.validate(config);
+ }
connector.initialize(new ConnectorContext() {
@Override
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 85695bb..5aeb851 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -265,27 +264,14 @@ class WorkerSinkTask extends WorkerTask {
* Initializes and starts the SinkTask.
*/
protected void initializeAndStart() {
- String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
- boolean topicsStrPresent = topicsStr != null &&
!topicsStr.trim().isEmpty();
+ SinkConnectorConfig.validate(taskConfig);
- String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
- boolean topicsRegexStrPresent = topicsRegexStr != null &&
!topicsRegexStr.trim().isEmpty();
-
- if (topicsStrPresent && topicsRegexStrPresent) {
- throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " +
SinkTask.TOPICS_REGEX_CONFIG +
- " are mutually exclusive options, but both are set.");
- }
-
- if (!topicsStrPresent && !topicsRegexStrPresent) {
- throw new ConfigException("Must configure one of " +
- SinkTask.TOPICS_CONFIG + " or " +
SinkTask.TOPICS_REGEX_CONFIG);
- }
-
- if (topicsStrPresent) {
- String[] topics = topicsStr.split(",");
+ if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
+ String[] topics =
taskConfig.get(SinkTask.TOPICS_CONFIG).split(",");
consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
log.debug("{} Initializing and starting task for topics {}", this,
topics);
} else {
+ String topicsRegexStr =
taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
Pattern pattern = Pattern.compile(topicsRegexStr);
consumer.subscribe(pattern, new HandleRebalance());
log.debug("{} Initializing and starting task for topics regex {}",
this, topicsRegexStr);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index dac1392..0718eb1 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
@@ -183,6 +184,21 @@ public class AbstractHerderTest {
verifyAll();
}
+ @Test(expected = ConfigException.class)
+ public void testConfigValidationInvalidTopics() {
+ AbstractHerder herder =
createConfigValidationHerder(TestSinkConnector.class);
+ replayAll();
+
+ Map<String, String> config = new HashMap();
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
TestSinkConnector.class.getName());
+ config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1,topic2");
+ config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
+
+ herder.validateConnectorConfig(config);
+
+ verifyAll();
+ }
+
@Test()
public void testConfigValidationTransformsExtendResults() {
AbstractHerder herder =
createConfigValidationHerder(TestSourceConnector.class);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index d7307cf..d7a7d87 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -355,7 +355,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
// config validation
- Connector connectorMock = PowerMock.createMock(Connector.class);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -398,7 +398,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
// config validation
- Connector connectorMock = PowerMock.createMock(Connector.class);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -443,7 +443,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
// config validation
- Connector connectorMock = PowerMock.createMock(Connector.class);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -1338,7 +1338,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
// config validation
- Connector connectorMock = PowerMock.createMock(Connector.class);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 79be45b..fd330f2 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -125,7 +125,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- expectConfigValidation(config);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ expectConfigValidation(connectorMock, true, config);
PowerMock.replayAll();
@@ -142,7 +143,7 @@ public class StandaloneHerderTest {
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
config.remove(ConnectorConfig.NAME_CONFIG);
- Connector connectorMock = PowerMock.createMock(Connector.class);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -167,7 +168,7 @@ public class StandaloneHerderTest {
public void testCreateConnectorFailedCustomValidation() throws Exception {
connector = PowerMock.createMock(BogusSourceConnector.class);
- Connector connectorMock = PowerMock.createMock(Connector.class);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -199,7 +200,7 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- Connector connectorMock = PowerMock.createMock(Connector.class);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config, config);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2);
@@ -224,7 +225,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SINK);
Map<String, String> config = connectorConfig(SourceSink.SINK);
- expectConfigValidation(config);
+ Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+ expectConfigValidation(connectorMock, true, config);
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, config, false,
createCallback);
@@ -238,7 +240,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- expectConfigValidation(config);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ expectConfigValidation(connectorMock, true, config);
EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME,
AbstractStatus.State.DESTROYED, WORKER_ID, 0));
@@ -270,7 +273,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- expectConfigValidation(config);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ expectConfigValidation(connectorMock, true, config);
worker.stopConnector(CONNECTOR_NAME);
EasyMock.expectLastCall().andReturn(true);
@@ -295,7 +299,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- expectConfigValidation(config);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ expectConfigValidation(connectorMock, true, config);
worker.stopConnector(CONNECTOR_NAME);
EasyMock.expectLastCall().andReturn(true);
@@ -326,7 +331,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> connectorConfig =
connectorConfig(SourceSink.SOURCE);
- expectConfigValidation(connectorConfig);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ expectConfigValidation(connectorMock, true, connectorConfig);
worker.stopAndAwaitTask(taskId);
EasyMock.expectLastCall();
@@ -351,7 +357,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> connectorConfig =
connectorConfig(SourceSink.SOURCE);
- expectConfigValidation(connectorConfig);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ expectConfigValidation(connectorMock, true, connectorConfig);
worker.stopAndAwaitTask(taskId);
EasyMock.expectLastCall();
@@ -381,7 +388,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> connectorConfig =
connectorConfig(SourceSink.SOURCE);
- expectConfigValidation(connectorConfig);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ expectConfigValidation(connectorMock, true, connectorConfig);
// herder.stop() should stop any running connectors and tasks even if
destroyConnector was not invoked
expectStop();
@@ -402,6 +410,7 @@ public class StandaloneHerderTest {
@Test
public void testAccessors() throws Exception {
Map<String, String> connConfig = connectorConfig(SourceSink.SOURCE);
+ System.out.println(connConfig);
Callback<Collection<String>> listConnectorsCb =
PowerMock.createMock(Callback.class);
Callback<ConnectorInfo> connectorInfoCb =
PowerMock.createMock(Callback.class);
@@ -421,7 +430,8 @@ public class StandaloneHerderTest {
// Create connector
connector = PowerMock.createMock(BogusSourceConnector.class);
expectAdd(SourceSink.SOURCE);
- expectConfigValidation(connConfig);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ expectConfigValidation(connector, true, connConfig);
// Validate accessors with 1 connector
listConnectorsCb.onCompletion(null, singleton(CONNECTOR_NAME));
@@ -467,7 +477,7 @@ public class StandaloneHerderTest {
// Create
connector = PowerMock.createMock(BogusSourceConnector.class);
expectAdd(SourceSink.SOURCE);
- Connector connectorMock = PowerMock.createMock(Connector.class);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, connConfig);
// Should get first config
@@ -526,7 +536,8 @@ public class StandaloneHerderTest {
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
BogusSinkConnector.class.getName());
- Connector connectorMock = PowerMock.createMock(Connector.class);
+ config.put(SinkConnectorConfig.TOPICS_CONFIG, TOPICS_LIST_STR);
+ Connector connectorMock = PowerMock.createMock(SinkConnector.class);
String error = "This is an error in your config!";
List<String> errors = new ArrayList<>(singletonList(error));
String key = "foo.invalid.key";
@@ -592,7 +603,7 @@ public class StandaloneHerderTest {
EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName()))
.andReturn(ConnectorType.SOURCE).anyTimes();
EasyMock.expect(herder.connectorTypeForClass(BogusSinkConnector.class.getName()))
- .andReturn(ConnectorType.SINK).anyTimes();
+ .andReturn(ConnectorType.SINK).anyTimes();
worker.isSinkConnector(CONNECTOR_NAME);
PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK);
}
@@ -631,12 +642,6 @@ public class StandaloneHerderTest {
return generatedTaskProps;
}
-
- private void expectConfigValidation(Map<String, String> ... configs) {
- Connector connectorMock = PowerMock.createMock(Connector.class);
- expectConfigValidation(connectorMock, true, configs);
- }
-
private void expectConfigValidation(
Connector connectorMock,
boolean shouldCreateConnector,
--
To stop receiving notification emails like this one, please contact
[email protected].