This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a110f1fe85 KAFKA-10000: Add new preflight connector config validation
logic (#11776)
a110f1fe85 is described below
commit a110f1fe852ae8c958a8c64b0736a9bb0617338e
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Jun 2 05:57:50 2022 -0400
KAFKA-10000: Add new preflight connector config validation logic (#11776)
Reviewers: Mickael Maison <[email protected]>
, Tom Bentley
<[email protected]>
---
.../kafka/connect/runtime/AbstractHerder.java | 120 +++++----
.../runtime/distributed/DistributedHerder.java | 143 ++++++++++-
.../runtime/distributed/DistributedHerderTest.java | 269 ++++++++++++++++++++-
3 files changed, 466 insertions(+), 66 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 2fe75a955b..166ac9f05c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
@@ -40,6 +42,7 @@ import
org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
+import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.Converter;
@@ -349,9 +352,11 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
status.workerId(), status.trace());
}
- protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector
connector,
- ConfigDef
configDef,
-
Map<String, String> config) {
+ protected Map<String, ConfigValue>
validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef,
Map<String, String> config) {
+ return configDef.validateAll(config);
+ }
+
+ protected Map<String, ConfigValue>
validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef,
Map<String, String> config) {
return configDef.validateAll(config);
}
@@ -417,7 +422,23 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
conf == null ? ConnectorType.UNKNOWN :
connectorTypeForClass(conf.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))
);
return Optional.of(new RestartPlan(request, stateInfo));
+ }
+
+ protected boolean
connectorUsesConsumer(org.apache.kafka.connect.health.ConnectorType
connectorType, Map<String, String> connProps) {
+ return connectorType ==
org.apache.kafka.connect.health.ConnectorType.SINK;
+ }
+ protected boolean
connectorUsesAdmin(org.apache.kafka.connect.health.ConnectorType connectorType,
Map<String, String> connProps) {
+ if (connectorType ==
org.apache.kafka.connect.health.ConnectorType.SOURCE) {
+ return SourceConnectorConfig.usesTopicCreation(connProps);
+ } else {
+ return SinkConnectorConfig.hasDlqTopicConfig(connProps);
+ }
+ }
+
+ protected boolean
connectorUsesProducer(org.apache.kafka.connect.health.ConnectorType
connectorType, Map<String, String> connProps) {
+ return connectorType ==
org.apache.kafka.connect.health.ConnectorType.SOURCE
+ || SinkConnectorConfig.hasDlqTopicConfig(connProps);
}
ConfigInfos validateConnectorConfig(Map<String, String> connectorProps,
boolean doLog) {
@@ -431,22 +452,20 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
Connector connector = getConnector(connType);
org.apache.kafka.connect.health.ConnectorType connectorType;
ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
+ ConfigDef enrichedConfigDef;
+ Map<String, ConfigValue> validatedConnectorConfig;
try {
- ConfigDef baseConfigDef;
if (connector instanceof SourceConnector) {
- baseConfigDef = SourceConnectorConfig.configDef();
connectorType =
org.apache.kafka.connect.health.ConnectorType.SOURCE;
+ enrichedConfigDef = ConnectorConfig.enrich(plugins(),
SourceConnectorConfig.configDef(), connectorProps, false);
+ validatedConnectorConfig =
validateSourceConnectorConfig((SourceConnector) connector, enrichedConfigDef,
connectorProps);
} else {
- baseConfigDef = SinkConnectorConfig.configDef();
SinkConnectorConfig.validate(connectorProps);
connectorType =
org.apache.kafka.connect.health.ConnectorType.SINK;
+ enrichedConfigDef = ConnectorConfig.enrich(plugins(),
SinkConnectorConfig.configDef(), connectorProps, false);
+ validatedConnectorConfig =
validateSinkConnectorConfig((SinkConnector) connector, enrichedConfigDef,
connectorProps);
}
- ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(),
baseConfigDef, connectorProps, false);
- Map<String, ConfigValue> validatedConnectorConfig =
validateBasicConnectorConfig(
- connector,
- enrichedConfigDef,
- connectorProps
- );
+
connectorProps.entrySet().stream()
.filter(e -> e.getValue() == null)
.map(Map.Entry::getKey)
@@ -454,6 +473,7 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
validatedConnectorConfig.computeIfAbsent(prop,
ConfigValue::new)
.addErrorMessage("Null value can not be supplied as
the configuration value.")
);
+
List<ConfigValue> configValues = new
ArrayList<>(validatedConnectorConfig.values());
Map<String, ConfigKey> configKeys = new
LinkedHashMap<>(enrichedConfigDef.configKeys());
Set<String> allGroups = new
LinkedHashSet<>(enrichedConfigDef.groups());
@@ -487,40 +507,41 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
ConfigInfos producerConfigInfos = null;
ConfigInfos consumerConfigInfos = null;
ConfigInfos adminConfigInfos = null;
- if
(connectorType.equals(org.apache.kafka.connect.health.ConnectorType.SOURCE)) {
- producerConfigInfos = validateClientOverrides(connName,
-
ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
- connectorConfig,
-
ProducerConfig.configDef(),
-
connector.getClass(),
- connectorType,
-
ConnectorClientConfigRequest.ClientType.PRODUCER,
-
connectorClientConfigOverridePolicy);
- return mergeConfigInfos(connType, configInfos,
producerConfigInfos);
- } else {
- consumerConfigInfos = validateClientOverrides(connName,
-
ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
- connectorConfig,
-
ProducerConfig.configDef(),
-
connector.getClass(),
- connectorType,
-
ConnectorClientConfigRequest.ClientType.CONSUMER,
-
connectorClientConfigOverridePolicy);
- // check if topic for dead letter queue exists
- String topic =
connectorProps.get(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG);
- if (topic != null && !topic.isEmpty()) {
- adminConfigInfos = validateClientOverrides(connName,
-
ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
- connectorConfig,
-
ProducerConfig.configDef(),
-
connector.getClass(),
- connectorType,
-
ConnectorClientConfigRequest.ClientType.ADMIN,
-
connectorClientConfigOverridePolicy);
- }
+ if (connectorUsesProducer(connectorType, connectorProps)) {
+ producerConfigInfos = validateClientOverrides(
+ connName,
+ ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
+ connectorConfig,
+ ProducerConfig.configDef(),
+ connector.getClass(),
+ connectorType,
+ ConnectorClientConfigRequest.ClientType.PRODUCER,
+ connectorClientConfigOverridePolicy);
}
- return mergeConfigInfos(connType, configInfos,
consumerConfigInfos, adminConfigInfos);
+ if (connectorUsesAdmin(connectorType, connectorProps)) {
+ adminConfigInfos = validateClientOverrides(
+ connName,
+ ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
+ connectorConfig,
+ AdminClientConfig.configDef(),
+ connector.getClass(),
+ connectorType,
+ ConnectorClientConfigRequest.ClientType.ADMIN,
+ connectorClientConfigOverridePolicy);
+ }
+ if (connectorUsesConsumer(connectorType, connectorProps)) {
+ consumerConfigInfos = validateClientOverrides(
+ connName,
+ ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
+ connectorConfig,
+ ConsumerConfig.configDef(),
+ connector.getClass(),
+ connectorType,
+ ConnectorClientConfigRequest.ClientType.CONSUMER,
+ connectorClientConfigOverridePolicy);
+ }
+ return mergeConfigInfos(connType, configInfos,
producerConfigInfos, consumerConfigInfos, adminConfigInfos);
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
@@ -665,7 +686,7 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
return tempConnectors.computeIfAbsent(connType, k ->
plugins().newConnector(k));
}
- /*
+ /**
* Retrieves ConnectorType for the corresponding connector class
* @param connClass class of the connector
*/
@@ -673,6 +694,15 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
return ConnectorType.from(getConnector(connClass).getClass());
}
+ /**
+ * Retrieves ConnectorType for the class specified in the connector config
+ * @param connConfig the connector config; may not be null
+ * @return the {@link ConnectorType} of the connector
+ */
+ public ConnectorType connectorTypeForConfig(Map<String, String>
connConfig) {
+ return
connectorTypeForClass(connConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+ }
+
/**
* Checks a given {@link ConfigInfos} for validation error messages and
adds an exception
* to the given {@link Callback} if any were found.
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 65a8e7e15b..49d3a5278b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.connector.Connector;
import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
@@ -58,6 +57,10 @@ import
org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
+import org.apache.kafka.connect.source.ExactlyOnceSupport;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
@@ -138,6 +141,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS =
TimeUnit.SECONDS.toMillis(10);
private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS =
TimeUnit.SECONDS.toMillis(1);
private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
+ private static final long CONFIG_TOPIC_WRITE_PRIVILEGES_BACKOFF_MS = 250;
private static final int START_STOP_THREAD_POOL_SIZE = 8;
private static final short BACKOFF_RETRIES = 5;
@@ -842,21 +846,134 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
}
@Override
- protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector
connector,
- ConfigDef
configDef,
-
Map<String, String> config) {
- Map<String, ConfigValue> validatedConfig =
super.validateBasicConnectorConfig(connector, configDef, config);
- if (connector instanceof SinkConnector) {
- ConfigValue validatedName =
validatedConfig.get(ConnectorConfig.NAME_CONFIG);
- String name = (String) validatedName.value();
- if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
- validatedName.addErrorMessage("Consumer group for sink
connector named " + name +
- " conflicts with Connect worker group " +
workerGroupId);
+ protected Map<String, ConfigValue>
validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef,
Map<String, String> config) {
+ Map<String, ConfigValue> result =
super.validateSinkConnectorConfig(connector, configDef, config);
+ validateSinkConnectorGroupId(result);
+ return result;
+ }
+
+ @Override
+ protected Map<String, ConfigValue>
validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef,
Map<String, String> config) {
+ Map<String, ConfigValue> result =
super.validateSourceConnectorConfig(connector, configDef, config);
+ validateSourceConnectorExactlyOnceSupport(config, result, connector);
+ validateSourceConnectorTransactionBoundary(config, result, connector);
+ return result;
+ }
+
+
+ private void validateSinkConnectorGroupId(Map<String, ConfigValue>
validatedConfig) {
+ ConfigValue validatedName =
validatedConfig.get(ConnectorConfig.NAME_CONFIG);
+ String name = (String) validatedName.value();
+ if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
+ validatedName.addErrorMessage("Consumer group for sink connector
named " + name +
+ " conflicts with Connect worker group " + workerGroupId);
+ }
+ }
+
+ private void validateSourceConnectorExactlyOnceSupport(
+ Map<String, String> rawConfig,
+ Map<String, ConfigValue> validatedConfig,
+ SourceConnector connector) {
+ ConfigValue validatedExactlyOnceSupport =
validatedConfig.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG);
+ if (validatedExactlyOnceSupport.errorMessages().isEmpty()) {
+ // Should be safe to parse the enum from the user-provided value
since it's passed validation so far
+ SourceConnectorConfig.ExactlyOnceSupportLevel
exactlyOnceSupportLevel =
+
SourceConnectorConfig.ExactlyOnceSupportLevel.fromProperty(Objects.toString(validatedExactlyOnceSupport.value()));
+ if
(SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.equals(exactlyOnceSupportLevel))
{
+ if (!config.exactlyOnceSourceEnabled()) {
+ validatedExactlyOnceSupport.addErrorMessage("This worker
does not have exactly-once source support enabled.");
+ }
+
+ try {
+ ExactlyOnceSupport exactlyOnceSupport =
connector.exactlyOnceSupport(rawConfig);
+ if
(!ExactlyOnceSupport.SUPPORTED.equals(exactlyOnceSupport)) {
+ final String validationErrorMessage;
+ // Would do a switch here but that doesn't permit
matching on null values
+ if (exactlyOnceSupport == null) {
+ validationErrorMessage = "The connector does not
implement the API required for preflight validation of exactly-once "
+ + "source support. Please consult the
documentation for the connector to determine whether it supports exactly-once "
+ + "guarantees, and then consider
reconfiguring the connector to use the value \""
+ +
SourceConnectorConfig.ExactlyOnceSupportLevel.REQUESTED
+ + "\" for this property (which will
disable this preflight check and allow the connector to be created).";
+ } else if
(ExactlyOnceSupport.UNSUPPORTED.equals(exactlyOnceSupport)) {
+ validationErrorMessage = "The connector does not
support exactly-once delivery guarantees with the provided configuration.";
+ } else {
+ throw new ConnectException("Unexpected value
returned from SourceConnector::exactlyOnceSupport: " + exactlyOnceSupport);
+ }
+
validatedExactlyOnceSupport.addErrorMessage(validationErrorMessage);
+ }
+ } catch (Exception e) {
+ log.error("Failed while validating connector support for
exactly-once guarantees", e);
+ String validationErrorMessage = "An unexpected error
occurred during validation";
+ String failureMessage = e.getMessage();
+ if (failureMessage != null &&
!failureMessage.trim().isEmpty()) {
+ validationErrorMessage += ": " + failureMessage.trim();
+ } else {
+ validationErrorMessage += "; please see the worker
logs for more details.";
+ }
+
validatedExactlyOnceSupport.addErrorMessage(validationErrorMessage);
+ }
}
}
- return validatedConfig;
}
+ private void validateSourceConnectorTransactionBoundary(
+ Map<String, String> rawConfig,
+ Map<String, ConfigValue> validatedConfig,
+ SourceConnector connector) {
+ ConfigValue validatedTransactionBoundary =
validatedConfig.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG);
+ if (validatedTransactionBoundary.errorMessages().isEmpty()) {
+ // Should be safe to parse the enum from the user-provided value
since it's passed validation so far
+ SourceTask.TransactionBoundary transactionBoundary =
+
SourceTask.TransactionBoundary.fromProperty(Objects.toString(validatedTransactionBoundary.value()));
+ if
(SourceTask.TransactionBoundary.CONNECTOR.equals(transactionBoundary)) {
+ try {
+ ConnectorTransactionBoundaries connectorTransactionSupport
= connector.canDefineTransactionBoundaries(rawConfig);
+ if (connectorTransactionSupport == null) {
+ validatedTransactionBoundary.addErrorMessage(
+ "This connector has returned a null value from
its canDefineTransactionBoundaries method, which is not permitted. " +
+ "The connector will be treated as if
it cannot define its own transaction boundaries, and cannot be configured with
" +
+ "'" +
SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG + "' set to '" +
SourceTask.TransactionBoundary.CONNECTOR + "'."
+ );
+ } else if
(!ConnectorTransactionBoundaries.SUPPORTED.equals(connectorTransactionSupport))
{
+ validatedTransactionBoundary.addErrorMessage(
+ "The connector does not support
connector-defined transaction boundaries with the given configuration. "
+ + "Please reconfigure it to use a
different transaction boundary definition.");
+ }
+ } catch (Exception e) {
+ log.error("Failed while validating connector support for
defining its own transaction boundaries", e);
+ String validationErrorMessage = "An unexpected error
occurred during validation";
+ String failureMessage = e.getMessage();
+ if (failureMessage != null &&
!failureMessage.trim().isEmpty()) {
+ validationErrorMessage += ": " + failureMessage.trim();
+ } else {
+ validationErrorMessage += "; please see the worker
logs for more details.";
+ }
+
validatedTransactionBoundary.addErrorMessage(validationErrorMessage);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected boolean
connectorUsesAdmin(org.apache.kafka.connect.health.ConnectorType connectorType,
Map<String, String> connProps) {
+ return super.connectorUsesAdmin(connectorType, connProps)
+ || connectorUsesSeparateOffsetsTopicClients(connectorType,
connProps);
+ }
+
+ @Override
+ protected boolean
connectorUsesConsumer(org.apache.kafka.connect.health.ConnectorType
connectorType, Map<String, String> connProps) {
+ return super.connectorUsesConsumer(connectorType, connProps)
+ || connectorUsesSeparateOffsetsTopicClients(connectorType,
connProps);
+ }
+
+ private boolean
connectorUsesSeparateOffsetsTopicClients(org.apache.kafka.connect.health.ConnectorType
connectorType, Map<String, String> connProps) {
+ if (connectorType !=
org.apache.kafka.connect.health.ConnectorType.SOURCE) {
+ return false;
+ }
+ return config.exactlyOnceSourceEnabled()
+ ||
!connProps.getOrDefault(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG,
"").trim().isEmpty();
+ }
@Override
public void putConnectorConfig(final String connName, final Map<String,
String> config, final boolean allowReplace,
@@ -897,7 +1014,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
// snapshot yet. The existing task info should
still be accurate.
ConnectorInfo info = new ConnectorInfo(connName,
config, configState.tasks(connName),
// validateConnectorConfig have checked the
existence of CONNECTOR_CLASS_CONFIG
-
connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
+ connectorTypeForConfig(config));
callback.onCompletion(null, new Created<>(!exists,
info));
return null;
},
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 996c8407c6..54dacd6d82 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.connect.connector.Connector;
import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import
org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
@@ -33,6 +32,7 @@ import org.apache.kafka.connect.runtime.RestartPlan;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TopicStatus;
@@ -52,6 +52,8 @@ import
org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
+import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.ConfigBackingStore;
@@ -90,16 +92,18 @@ import java.util.concurrent.TimeoutException;
import static java.util.Collections.singletonList;
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
+import static
org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED;
import static
org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
+import static
org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
import static
org.apache.kafka.connect.runtime.distributed.DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT;
import static
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
import static
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.newCapture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -790,22 +794,261 @@ public class DistributedHerderTest {
PowerMock.verifyAll();
}
- @SuppressWarnings("unchecked")
@Test
public void testConnectorNameConflictsWithWorkerGroupId() {
Map<String, String> config = new HashMap<>(CONN2_CONFIG);
config.put(ConnectorConfig.NAME_CONFIG, "test-group");
- Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+ SinkConnector connectorMock =
PowerMock.createMock(SinkConnector.class);
+
+ PowerMock.replayAll(connectorMock);
// CONN2 creation should fail because the worker group id
(connect-test-group) conflicts with
// the consumer group id we would use for this sink
- Map<String, ConfigValue> validatedConfigs =
- herder.validateBasicConnectorConfig(connectorMock,
ConnectorConfig.configDef(), config);
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSinkConnectorConfig(
+ connectorMock, SinkConnectorConfig.configDef(), config);
ConfigValue nameConfig =
validatedConfigs.get(ConnectorConfig.NAME_CONFIG);
- assertNotNull(nameConfig.errorMessages());
- assertFalse(nameConfig.errorMessages().isEmpty());
+ assertEquals(
+ Collections.singletonList("Consumer group for sink connector
named test-group conflicts with Connect worker group connect-test-group"),
+ nameConfig.errorMessages());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testExactlyOnceSourceSupportValidation() {
+ herder = exactlyOnceHerder();
+ Map<String, String> config = new HashMap<>();
+ config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG,
REQUIRED.toString());
+
+ SourceConnector connectorMock =
PowerMock.createMock(SourceConnector.class);
+ EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+ .andReturn(ExactlyOnceSupport.SUPPORTED);
+
+ PowerMock.replayAll(connectorMock);
+
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSourceConnectorConfig(
+ connectorMock, SourceConnectorConfig.configDef(), config);
+
+ List<String> errors =
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+ assertEquals(Collections.emptyList(), errors);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testExactlyOnceSourceSupportValidationOnUnsupportedConnector()
{
+ herder = exactlyOnceHerder();
+ Map<String, String> config = new HashMap<>();
+ config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG,
REQUIRED.toString());
+
+ SourceConnector connectorMock =
PowerMock.createMock(SourceConnector.class);
+ EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+ .andReturn(ExactlyOnceSupport.UNSUPPORTED);
+
+ PowerMock.replayAll(connectorMock);
+
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSourceConnectorConfig(
+ connectorMock, SourceConnectorConfig.configDef(), config);
+
+ List<String> errors =
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+ assertEquals(
+ Collections.singletonList("The connector does not support
exactly-once delivery guarantees with the provided configuration."),
+ errors);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testExactlyOnceSourceSupportValidationOnUnknownConnector() {
+ herder = exactlyOnceHerder();
+ Map<String, String> config = new HashMap<>();
+ config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG,
REQUIRED.toString());
+
+ SourceConnector connectorMock =
PowerMock.createMock(SourceConnector.class);
+ EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+ .andReturn(null);
+
+ PowerMock.replayAll(connectorMock);
+
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSourceConnectorConfig(
+ connectorMock, SourceConnectorConfig.configDef(), config);
+
+ List<String> errors =
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+ assertFalse(errors.isEmpty());
+ assertTrue(
+ "Error message did not contain expected text: " +
errors.get(0),
+ errors.get(0).contains("The connector does not implement the
API required for preflight validation of exactly-once source support."));
+ assertEquals(1, errors.size());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void
testExactlyOnceSourceSupportValidationHandlesConnectorErrorsGracefully() {
+ herder = exactlyOnceHerder();
+ Map<String, String> config = new HashMap<>();
+ config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG,
REQUIRED.toString());
+
+ SourceConnector connectorMock =
PowerMock.createMock(SourceConnector.class);
+ String errorMessage = "time to add a new unit test :)";
+ EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+ .andThrow(new NullPointerException(errorMessage));
+
+ PowerMock.replayAll(connectorMock);
+
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSourceConnectorConfig(
+ connectorMock, SourceConnectorConfig.configDef(), config);
+
+ List<String> errors =
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+ assertFalse(errors.isEmpty());
+ assertTrue(
+ "Error message did not contain expected text: " +
errors.get(0),
+ errors.get(0).contains(errorMessage));
+ assertEquals(1, errors.size());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void
testExactlyOnceSourceSupportValidationWhenExactlyOnceNotEnabledOnWorker() {
+ Map<String, String> config = new HashMap<>();
+ config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG,
REQUIRED.toString());
+
+ SourceConnector connectorMock =
PowerMock.createMock(SourceConnector.class);
+ EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+ .andReturn(ExactlyOnceSupport.SUPPORTED);
+
+ PowerMock.replayAll(connectorMock);
+
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSourceConnectorConfig(
+ connectorMock, SourceConnectorConfig.configDef(), config);
+
+ List<String> errors =
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+ assertEquals(
+ Collections.singletonList("This worker does not have
exactly-once source support enabled."),
+ errors);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void
testExactlyOnceSourceSupportValidationHandlesInvalidValuesGracefully() {
+ herder = exactlyOnceHerder();
+ Map<String, String> config = new HashMap<>();
+ config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG,
"invalid");
+
+ SourceConnector connectorMock =
PowerMock.createMock(SourceConnector.class);
+
+ PowerMock.replayAll(connectorMock);
+
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSourceConnectorConfig(
+ connectorMock, SourceConnectorConfig.configDef(), config);
+
+ List<String> errors =
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+ assertFalse(errors.isEmpty());
+ assertTrue(
+ "Error message did not contain expected text: " +
errors.get(0),
+ errors.get(0).contains("String must be one of (case
insensitive): "));
+ assertEquals(1, errors.size());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testConnectorTransactionBoundaryValidation() {
+ herder = exactlyOnceHerder();
+ Map<String, String> config = new HashMap<>();
+ config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG,
CONNECTOR.toString());
+
+ SourceConnector connectorMock =
PowerMock.createMock(SourceConnector.class);
+
EasyMock.expect(connectorMock.canDefineTransactionBoundaries(EasyMock.eq(config)))
+ .andReturn(ConnectorTransactionBoundaries.SUPPORTED);
+
+ PowerMock.replayAll(connectorMock);
+
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSourceConnectorConfig(
+ connectorMock, SourceConnectorConfig.configDef(), config);
+
+ List<String> errors =
validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages();
+ assertEquals(Collections.emptyList(), errors);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void
testConnectorTransactionBoundaryValidationOnUnsupportedConnector() {
+ herder = exactlyOnceHerder();
+ Map<String, String> config = new HashMap<>();
+ config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG,
CONNECTOR.toString());
+
+ SourceConnector connectorMock =
PowerMock.createMock(SourceConnector.class);
+
EasyMock.expect(connectorMock.canDefineTransactionBoundaries(EasyMock.eq(config)))
+ .andReturn(ConnectorTransactionBoundaries.UNSUPPORTED);
+
+ PowerMock.replayAll(connectorMock);
+
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSourceConnectorConfig(
+ connectorMock, SourceConnectorConfig.configDef(), config);
+
+ List<String> errors =
validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages();
+ assertFalse(errors.isEmpty());
+ assertTrue(
+ "Error message did not contain expected text: " +
errors.get(0),
+ errors.get(0).contains("The connector does not support
connector-defined transaction boundaries with the given configuration."));
+ assertEquals(1, errors.size());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void
testConnectorTransactionBoundaryValidationHandlesConnectorErrorsGracefully() {
+ herder = exactlyOnceHerder();
+ Map<String, String> config = new HashMap<>();
+ config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG,
CONNECTOR.toString());
+
+ SourceConnector connectorMock =
PowerMock.createMock(SourceConnector.class);
+ String errorMessage = "Wait I thought we tested for this?";
+
EasyMock.expect(connectorMock.canDefineTransactionBoundaries(EasyMock.eq(config)))
+ .andThrow(new ConnectException(errorMessage));
+
+ PowerMock.replayAll(connectorMock);
+
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSourceConnectorConfig(
+ connectorMock, SourceConnectorConfig.configDef(), config);
+
+ List<String> errors =
validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages();
+ assertFalse(errors.isEmpty());
+ assertTrue(
+ "Error message did not contain expected text: " +
errors.get(0),
+ errors.get(0).contains(errorMessage));
+ assertEquals(1, errors.size());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void
testConnectorTransactionBoundaryValidationHandlesInvalidValuesGracefully() {
+ herder = exactlyOnceHerder();
+ Map<String, String> config = new HashMap<>();
+ config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG,
"CONNECTOR.toString()");
+
+ SourceConnector connectorMock =
PowerMock.createMock(SourceConnector.class);
+
+ PowerMock.replayAll(connectorMock);
+
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSourceConnectorConfig(
+ connectorMock, SourceConnectorConfig.configDef(), config);
+
+ List<String> errors =
validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages();
+ assertFalse(errors.isEmpty());
+ assertTrue(
+ "Error message did not contain expected text: " +
errors.get(0),
+ errors.get(0).contains("String must be one of (case
insensitive): "));
+ assertEquals(1, errors.size());
+
+ PowerMock.verifyAll();
}
@Test
@@ -2851,4 +3094,14 @@ public class DistributedHerderTest {
private abstract class BogusSourceTask extends SourceTask {
}
+ private DistributedHerder exactlyOnceHerder() {
+ Map<String, String> config = new HashMap<>(HERDER_CONFIG);
+ config.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+ return PowerMock.createPartialMock(DistributedHerder.class,
+ new String[]{"connectorTypeForClass",
"updateDeletedConnectorStatus", "updateDeletedTaskStatus",
"validateConnectorConfig"},
+ new DistributedConfig(config), worker, WORKER_ID,
KAFKA_CLUSTER_ID,
+ statusBackingStore, configBackingStore, member, MEMBER_URL,
metrics, time, noneConnectorClientConfigOverridePolicy,
+ new AutoCloseable[0]);
+ }
+
}