This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a110f1fe85 KAFKA-10000: Add new preflight connector config validation 
logic (#11776)
a110f1fe85 is described below

commit a110f1fe852ae8c958a8c64b0736a9bb0617338e
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Jun 2 05:57:50 2022 -0400

    KAFKA-10000: Add new preflight connector config validation logic (#11776)
    
    
    Reviewers: Mickael Maison <[email protected]>

, Tom Bentley 
<[email protected]>
---
 .../kafka/connect/runtime/AbstractHerder.java      | 120 +++++----
 .../runtime/distributed/DistributedHerder.java     | 143 ++++++++++-
 .../runtime/distributed/DistributedHerderTest.java | 269 ++++++++++++++++++++-
 3 files changed, 466 insertions(+), 66 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 2fe75a955b..166ac9f05c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.Config;
@@ -40,6 +42,7 @@ import 
org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
+import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.Converter;
@@ -349,9 +352,11 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
                 status.workerId(), status.trace());
     }
 
-    protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector 
connector,
-                                                                    ConfigDef 
configDef,
-                                                                    
Map<String, String> config) {
+    protected Map<String, ConfigValue> 
validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, 
Map<String, String> config) {
+        return configDef.validateAll(config);
+    }
+
+    protected Map<String, ConfigValue> 
validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef, 
Map<String, String> config) {
         return configDef.validateAll(config);
     }
 
@@ -417,7 +422,23 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
                 conf == null ? ConnectorType.UNKNOWN : 
connectorTypeForClass(conf.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))
         );
         return Optional.of(new RestartPlan(request, stateInfo));
+    }
+
+    protected boolean 
connectorUsesConsumer(org.apache.kafka.connect.health.ConnectorType 
connectorType, Map<String, String> connProps) {
+        return connectorType == 
org.apache.kafka.connect.health.ConnectorType.SINK;
+    }
 
+    protected boolean 
connectorUsesAdmin(org.apache.kafka.connect.health.ConnectorType connectorType, 
Map<String, String> connProps) {
+        if (connectorType == 
org.apache.kafka.connect.health.ConnectorType.SOURCE) {
+            return SourceConnectorConfig.usesTopicCreation(connProps);
+        } else {
+            return SinkConnectorConfig.hasDlqTopicConfig(connProps);
+        }
+    }
+
+    protected boolean 
connectorUsesProducer(org.apache.kafka.connect.health.ConnectorType 
connectorType, Map<String, String> connProps) {
+        return connectorType == 
org.apache.kafka.connect.health.ConnectorType.SOURCE
+            || SinkConnectorConfig.hasDlqTopicConfig(connProps);
     }
 
     ConfigInfos validateConnectorConfig(Map<String, String> connectorProps, 
boolean doLog) {
@@ -431,22 +452,20 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
         Connector connector = getConnector(connType);
         org.apache.kafka.connect.health.ConnectorType connectorType;
         ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
+        ConfigDef enrichedConfigDef;
+        Map<String, ConfigValue> validatedConnectorConfig;
         try {
-            ConfigDef baseConfigDef;
             if (connector instanceof SourceConnector) {
-                baseConfigDef = SourceConnectorConfig.configDef();
                 connectorType = 
org.apache.kafka.connect.health.ConnectorType.SOURCE;
+                enrichedConfigDef = ConnectorConfig.enrich(plugins(), 
SourceConnectorConfig.configDef(), connectorProps, false);
+                validatedConnectorConfig = 
validateSourceConnectorConfig((SourceConnector) connector, enrichedConfigDef, 
connectorProps);
             } else {
-                baseConfigDef = SinkConnectorConfig.configDef();
                 SinkConnectorConfig.validate(connectorProps);
                 connectorType = 
org.apache.kafka.connect.health.ConnectorType.SINK;
+                enrichedConfigDef = ConnectorConfig.enrich(plugins(), 
SinkConnectorConfig.configDef(), connectorProps, false);
+                validatedConnectorConfig = 
validateSinkConnectorConfig((SinkConnector) connector, enrichedConfigDef, 
connectorProps);
             }
-            ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), 
baseConfigDef, connectorProps, false);
-            Map<String, ConfigValue> validatedConnectorConfig = 
validateBasicConnectorConfig(
-                    connector,
-                    enrichedConfigDef,
-                    connectorProps
-            );
+
             connectorProps.entrySet().stream()
                 .filter(e -> e.getValue() == null)
                 .map(Map.Entry::getKey)
@@ -454,6 +473,7 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
                     validatedConnectorConfig.computeIfAbsent(prop, 
ConfigValue::new)
                         .addErrorMessage("Null value can not be supplied as 
the configuration value.")
             );
+
             List<ConfigValue> configValues = new 
ArrayList<>(validatedConnectorConfig.values());
             Map<String, ConfigKey> configKeys = new 
LinkedHashMap<>(enrichedConfigDef.configKeys());
             Set<String> allGroups = new 
LinkedHashSet<>(enrichedConfigDef.groups());
@@ -487,40 +507,41 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
             ConfigInfos producerConfigInfos = null;
             ConfigInfos consumerConfigInfos = null;
             ConfigInfos adminConfigInfos = null;
-            if 
(connectorType.equals(org.apache.kafka.connect.health.ConnectorType.SOURCE)) {
-                producerConfigInfos = validateClientOverrides(connName,
-                                                              
ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
-                                                              connectorConfig,
-                                                              
ProducerConfig.configDef(),
-                                                              
connector.getClass(),
-                                                              connectorType,
-                                                              
ConnectorClientConfigRequest.ClientType.PRODUCER,
-                                                              
connectorClientConfigOverridePolicy);
-                return mergeConfigInfos(connType, configInfos, 
producerConfigInfos);
-            } else {
-                consumerConfigInfos = validateClientOverrides(connName,
-                                                              
ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
-                                                              connectorConfig,
-                                                              
ProducerConfig.configDef(),
-                                                              
connector.getClass(),
-                                                              connectorType,
-                                                              
ConnectorClientConfigRequest.ClientType.CONSUMER,
-                                                              
connectorClientConfigOverridePolicy);
-                // check if topic for dead letter queue exists
-                String topic = 
connectorProps.get(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG);
-                if (topic != null && !topic.isEmpty()) {
-                    adminConfigInfos = validateClientOverrides(connName,
-                                                               
ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
-                                                               connectorConfig,
-                                                               
ProducerConfig.configDef(),
-                                                               
connector.getClass(),
-                                                               connectorType,
-                                                               
ConnectorClientConfigRequest.ClientType.ADMIN,
-                                                               
connectorClientConfigOverridePolicy);
-                }
 
+            if (connectorUsesProducer(connectorType, connectorProps)) {
+                producerConfigInfos = validateClientOverrides(
+                    connName,
+                    ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
+                    connectorConfig,
+                    ProducerConfig.configDef(),
+                    connector.getClass(),
+                    connectorType,
+                    ConnectorClientConfigRequest.ClientType.PRODUCER,
+                    connectorClientConfigOverridePolicy);
             }
-            return mergeConfigInfos(connType, configInfos, 
consumerConfigInfos, adminConfigInfos);
+            if (connectorUsesAdmin(connectorType, connectorProps)) {
+                adminConfigInfos = validateClientOverrides(
+                    connName,
+                    ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
+                    connectorConfig,
+                    AdminClientConfig.configDef(),
+                    connector.getClass(),
+                    connectorType,
+                    ConnectorClientConfigRequest.ClientType.ADMIN,
+                    connectorClientConfigOverridePolicy);
+            }
+            if (connectorUsesConsumer(connectorType, connectorProps)) {
+                consumerConfigInfos = validateClientOverrides(
+                    connName,
+                    ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
+                    connectorConfig,
+                    ConsumerConfig.configDef(),
+                    connector.getClass(),
+                    connectorType,
+                    ConnectorClientConfigRequest.ClientType.CONSUMER,
+                    connectorClientConfigOverridePolicy);
+            }
+            return mergeConfigInfos(connType, configInfos, 
producerConfigInfos, consumerConfigInfos, adminConfigInfos);
         } finally {
             Plugins.compareAndSwapLoaders(savedLoader);
         }
@@ -665,7 +686,7 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
         return tempConnectors.computeIfAbsent(connType, k -> 
plugins().newConnector(k));
     }
 
-    /*
+    /**
      * Retrieves ConnectorType for the corresponding connector class
      * @param connClass class of the connector
      */
@@ -673,6 +694,15 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
         return ConnectorType.from(getConnector(connClass).getClass());
     }
 
+    /**
+     * Retrieves ConnectorType for the class specified in the connector config
+     * @param connConfig the connector config; may not be null
+     * @return the {@link ConnectorType} of the connector
+     */
+    public ConnectorType connectorTypeForConfig(Map<String, String> 
connConfig) {
+        return 
connectorTypeForClass(connConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+    }
+
     /**
      * Checks a given {@link ConfigInfos} for validation error messages and 
adds an exception
      * to the given {@link Callback} if any were found.
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 65a8e7e15b..49d3a5278b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.connector.Connector;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -58,6 +57,10 @@ import 
org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
+import org.apache.kafka.connect.source.ExactlyOnceSupport;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
@@ -138,6 +141,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(10);
     private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(1);
     private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
+    private static final long CONFIG_TOPIC_WRITE_PRIVILEGES_BACKOFF_MS = 250;
     private static final int START_STOP_THREAD_POOL_SIZE = 8;
     private static final short BACKOFF_RETRIES = 5;
 
@@ -842,21 +846,134 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     }
 
     @Override
-    protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector 
connector,
-                                                                    ConfigDef 
configDef,
-                                                                    
Map<String, String> config) {
-        Map<String, ConfigValue> validatedConfig = 
super.validateBasicConnectorConfig(connector, configDef, config);
-        if (connector instanceof SinkConnector) {
-            ConfigValue validatedName = 
validatedConfig.get(ConnectorConfig.NAME_CONFIG);
-            String name = (String) validatedName.value();
-            if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
-                validatedName.addErrorMessage("Consumer group for sink 
connector named " + name +
-                        " conflicts with Connect worker group " + 
workerGroupId);
+    protected Map<String, ConfigValue> 
validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, 
Map<String, String> config) {
+        Map<String, ConfigValue> result = 
super.validateSinkConnectorConfig(connector, configDef, config);
+        validateSinkConnectorGroupId(result);
+        return result;
+    }
+
+    @Override
+    protected Map<String, ConfigValue> 
validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef, 
Map<String, String> config) {
+        Map<String, ConfigValue> result = 
super.validateSourceConnectorConfig(connector, configDef, config);
+        validateSourceConnectorExactlyOnceSupport(config, result, connector);
+        validateSourceConnectorTransactionBoundary(config, result, connector);
+        return result;
+    }
+
+
+    private void validateSinkConnectorGroupId(Map<String, ConfigValue> 
validatedConfig) {
+        ConfigValue validatedName = 
validatedConfig.get(ConnectorConfig.NAME_CONFIG);
+        String name = (String) validatedName.value();
+        if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
+            validatedName.addErrorMessage("Consumer group for sink connector 
named " + name +
+                    " conflicts with Connect worker group " + workerGroupId);
+        }
+    }
+
+    private void validateSourceConnectorExactlyOnceSupport(
+            Map<String, String> rawConfig,
+            Map<String, ConfigValue> validatedConfig,
+            SourceConnector connector) {
+        ConfigValue validatedExactlyOnceSupport = 
validatedConfig.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG);
+        if (validatedExactlyOnceSupport.errorMessages().isEmpty()) {
+            // Should be safe to parse the enum from the user-provided value 
since it's passed validation so far
+            SourceConnectorConfig.ExactlyOnceSupportLevel 
exactlyOnceSupportLevel =
+                    
SourceConnectorConfig.ExactlyOnceSupportLevel.fromProperty(Objects.toString(validatedExactlyOnceSupport.value()));
+            if 
(SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.equals(exactlyOnceSupportLevel))
 {
+                if (!config.exactlyOnceSourceEnabled()) {
+                    validatedExactlyOnceSupport.addErrorMessage("This worker 
does not have exactly-once source support enabled.");
+                }
+
+                try {
+                    ExactlyOnceSupport exactlyOnceSupport = 
connector.exactlyOnceSupport(rawConfig);
+                    if 
(!ExactlyOnceSupport.SUPPORTED.equals(exactlyOnceSupport)) {
+                        final String validationErrorMessage;
+                        // Would do a switch here but that doesn't permit 
matching on null values
+                        if (exactlyOnceSupport == null) {
+                            validationErrorMessage = "The connector does not 
implement the API required for preflight validation of exactly-once "
+                                    + "source support. Please consult the 
documentation for the connector to determine whether it supports exactly-once "
+                                    + "guarantees, and then consider 
reconfiguring the connector to use the value \""
+                                    + 
SourceConnectorConfig.ExactlyOnceSupportLevel.REQUESTED
+                                    + "\" for this property (which will 
disable this preflight check and allow the connector to be created).";
+                        } else if 
(ExactlyOnceSupport.UNSUPPORTED.equals(exactlyOnceSupport)) {
+                            validationErrorMessage = "The connector does not 
support exactly-once delivery guarantees with the provided configuration.";
+                        } else {
+                            throw new ConnectException("Unexpected value 
returned from SourceConnector::exactlyOnceSupport: " + exactlyOnceSupport);
+                        }
+                        
validatedExactlyOnceSupport.addErrorMessage(validationErrorMessage);
+                    }
+                } catch (Exception e) {
+                    log.error("Failed while validating connector support for 
exactly-once guarantees", e);
+                    String validationErrorMessage = "An unexpected error 
occurred during validation";
+                    String failureMessage = e.getMessage();
+                    if (failureMessage != null && 
!failureMessage.trim().isEmpty()) {
+                        validationErrorMessage += ": " + failureMessage.trim();
+                    } else {
+                        validationErrorMessage += "; please see the worker 
logs for more details.";
+                    }
+                    
validatedExactlyOnceSupport.addErrorMessage(validationErrorMessage);
+                }
             }
         }
-        return validatedConfig;
     }
 
+    private void validateSourceConnectorTransactionBoundary(
+            Map<String, String> rawConfig,
+            Map<String, ConfigValue> validatedConfig,
+            SourceConnector connector) {
+        ConfigValue validatedTransactionBoundary = 
validatedConfig.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG);
+        if (validatedTransactionBoundary.errorMessages().isEmpty()) {
+            // Should be safe to parse the enum from the user-provided value 
since it's passed validation so far
+            SourceTask.TransactionBoundary transactionBoundary =
+                    
SourceTask.TransactionBoundary.fromProperty(Objects.toString(validatedTransactionBoundary.value()));
+            if 
(SourceTask.TransactionBoundary.CONNECTOR.equals(transactionBoundary)) {
+                try {
+                    ConnectorTransactionBoundaries connectorTransactionSupport 
= connector.canDefineTransactionBoundaries(rawConfig);
+                    if (connectorTransactionSupport == null) {
+                        validatedTransactionBoundary.addErrorMessage(
+                                "This connector has returned a null value from 
its canDefineTransactionBoundaries method, which is not permitted. " +
+                                        "The connector will be treated as if 
it cannot define its own transaction boundaries, and cannot be configured with 
" +
+                                        "'" + 
SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG + "' set to '" + 
SourceTask.TransactionBoundary.CONNECTOR + "'."
+                        );
+                    } else if 
(!ConnectorTransactionBoundaries.SUPPORTED.equals(connectorTransactionSupport)) 
{
+                        validatedTransactionBoundary.addErrorMessage(
+                                "The connector does not support 
connector-defined transaction boundaries with the given configuration. "
+                                        + "Please reconfigure it to use a 
different transaction boundary definition.");
+                    }
+                } catch (Exception e) {
+                    log.error("Failed while validating connector support for 
defining its own transaction boundaries", e);
+                    String validationErrorMessage = "An unexpected error 
occurred during validation";
+                    String failureMessage = e.getMessage();
+                    if (failureMessage != null && 
!failureMessage.trim().isEmpty()) {
+                        validationErrorMessage += ": " + failureMessage.trim();
+                    } else {
+                        validationErrorMessage += "; please see the worker 
logs for more details.";
+                    }
+                    
validatedTransactionBoundary.addErrorMessage(validationErrorMessage);
+                }
+            }
+        }
+    }
+
+    @Override
+    protected boolean 
connectorUsesAdmin(org.apache.kafka.connect.health.ConnectorType connectorType, 
Map<String, String> connProps) {
+        return super.connectorUsesAdmin(connectorType, connProps)
+                || connectorUsesSeparateOffsetsTopicClients(connectorType, 
connProps);
+    }
+
+    @Override
+    protected boolean 
connectorUsesConsumer(org.apache.kafka.connect.health.ConnectorType 
connectorType, Map<String, String> connProps) {
+        return super.connectorUsesConsumer(connectorType, connProps)
+                || connectorUsesSeparateOffsetsTopicClients(connectorType, 
connProps);
+    }
+
+    private boolean 
connectorUsesSeparateOffsetsTopicClients(org.apache.kafka.connect.health.ConnectorType
 connectorType, Map<String, String> connProps) {
+        if (connectorType != 
org.apache.kafka.connect.health.ConnectorType.SOURCE) {
+            return false;
+        }
+        return config.exactlyOnceSourceEnabled()
+                || 
!connProps.getOrDefault(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, 
"").trim().isEmpty();
+    }
 
     @Override
     public void putConnectorConfig(final String connName, final Map<String, 
String> config, final boolean allowReplace,
@@ -897,7 +1014,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                             // snapshot yet. The existing task info should 
still be accurate.
                             ConnectorInfo info = new ConnectorInfo(connName, 
config, configState.tasks(connName),
                                 // validateConnectorConfig have checked the 
existence of CONNECTOR_CLASS_CONFIG
-                                
connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
+                                connectorTypeForConfig(config));
                             callback.onCompletion(null, new Created<>(!exists, 
info));
                             return null;
                         },
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 996c8407c6..54dacd6d82 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.distributed;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.connect.connector.Connector;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import 
org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
@@ -33,6 +32,7 @@ import org.apache.kafka.connect.runtime.RestartPlan;
 import org.apache.kafka.connect.runtime.RestartRequest;
 import org.apache.kafka.connect.runtime.SessionKey;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.TopicStatus;
@@ -52,6 +52,8 @@ import 
org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
+import org.apache.kafka.connect.source.ExactlyOnceSupport;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
@@ -90,16 +92,18 @@ import java.util.concurrent.TimeoutException;
 
 import static java.util.Collections.singletonList;
 import static javax.ws.rs.core.Response.Status.FORBIDDEN;
+import static 
org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED;
 import static 
org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
+import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
 import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT;
 import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
 import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static 
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.newCapture;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -790,22 +794,261 @@ public class DistributedHerderTest {
         PowerMock.verifyAll();
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testConnectorNameConflictsWithWorkerGroupId() {
         Map<String, String> config = new HashMap<>(CONN2_CONFIG);
         config.put(ConnectorConfig.NAME_CONFIG, "test-group");
 
-        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        SinkConnector connectorMock = 
PowerMock.createMock(SinkConnector.class);
+
+        PowerMock.replayAll(connectorMock);
 
         // CONN2 creation should fail because the worker group id 
(connect-test-group) conflicts with
         // the consumer group id we would use for this sink
-        Map<String, ConfigValue> validatedConfigs =
-            herder.validateBasicConnectorConfig(connectorMock, 
ConnectorConfig.configDef(), config);
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSinkConnectorConfig(
+                connectorMock, SinkConnectorConfig.configDef(), config);
 
         ConfigValue nameConfig = 
validatedConfigs.get(ConnectorConfig.NAME_CONFIG);
-        assertNotNull(nameConfig.errorMessages());
-        assertFalse(nameConfig.errorMessages().isEmpty());
+        assertEquals(
+                Collections.singletonList("Consumer group for sink connector 
named test-group conflicts with Connect worker group connect-test-group"),
+                nameConfig.errorMessages());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testExactlyOnceSourceSupportValidation() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
REQUIRED.toString());
+
+        SourceConnector connectorMock = 
PowerMock.createMock(SourceConnector.class);
+        EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+                .andReturn(ExactlyOnceSupport.SUPPORTED);
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+        assertEquals(Collections.emptyList(), errors);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testExactlyOnceSourceSupportValidationOnUnsupportedConnector() 
{
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
REQUIRED.toString());
+
+        SourceConnector connectorMock = 
PowerMock.createMock(SourceConnector.class);
+        EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+                .andReturn(ExactlyOnceSupport.UNSUPPORTED);
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+        assertEquals(
+                Collections.singletonList("The connector does not support 
exactly-once delivery guarantees with the provided configuration."),
+                errors);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testExactlyOnceSourceSupportValidationOnUnknownConnector() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
REQUIRED.toString());
+
+        SourceConnector connectorMock = 
PowerMock.createMock(SourceConnector.class);
+        EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+                .andReturn(null);
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+        assertFalse(errors.isEmpty());
+        assertTrue(
+                "Error message did not contain expected text: " + 
errors.get(0),
+                errors.get(0).contains("The connector does not implement the 
API required for preflight validation of exactly-once source support."));
+        assertEquals(1, errors.size());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void 
testExactlyOnceSourceSupportValidationHandlesConnectorErrorsGracefully() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
REQUIRED.toString());
+
+        SourceConnector connectorMock = 
PowerMock.createMock(SourceConnector.class);
+        String errorMessage = "time to add a new unit test :)";
+        EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+                .andThrow(new NullPointerException(errorMessage));
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+        assertFalse(errors.isEmpty());
+        assertTrue(
+                "Error message did not contain expected text: " + 
errors.get(0),
+                errors.get(0).contains(errorMessage));
+        assertEquals(1, errors.size());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void 
testExactlyOnceSourceSupportValidationWhenExactlyOnceNotEnabledOnWorker() {
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
REQUIRED.toString());
+
+        SourceConnector connectorMock = 
PowerMock.createMock(SourceConnector.class);
+        EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+                .andReturn(ExactlyOnceSupport.SUPPORTED);
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+        assertEquals(
+                Collections.singletonList("This worker does not have 
exactly-once source support enabled."),
+                errors);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void 
testExactlyOnceSourceSupportValidationHandlesInvalidValuesGracefully() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
"invalid");
+
+        SourceConnector connectorMock = 
PowerMock.createMock(SourceConnector.class);
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+        assertFalse(errors.isEmpty());
+        assertTrue(
+                "Error message did not contain expected text: " + 
errors.get(0),
+                errors.get(0).contains("String must be one of (case 
insensitive): "));
+        assertEquals(1, errors.size());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorTransactionBoundaryValidation() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, 
CONNECTOR.toString());
+
+        SourceConnector connectorMock = 
PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(connectorMock.canDefineTransactionBoundaries(EasyMock.eq(config)))
+                .andReturn(ConnectorTransactionBoundaries.SUPPORTED);
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages();
+        assertEquals(Collections.emptyList(), errors);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void 
testConnectorTransactionBoundaryValidationOnUnsupportedConnector() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, 
CONNECTOR.toString());
+
+        SourceConnector connectorMock = 
PowerMock.createMock(SourceConnector.class);
+        
EasyMock.expect(connectorMock.canDefineTransactionBoundaries(EasyMock.eq(config)))
+                .andReturn(ConnectorTransactionBoundaries.UNSUPPORTED);
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages();
+        assertFalse(errors.isEmpty());
+        assertTrue(
+                "Error message did not contain expected text: " + 
errors.get(0),
+                errors.get(0).contains("The connector does not support 
connector-defined transaction boundaries with the given configuration."));
+        assertEquals(1, errors.size());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void 
testConnectorTransactionBoundaryValidationHandlesConnectorErrorsGracefully() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, 
CONNECTOR.toString());
+
+        SourceConnector connectorMock = 
PowerMock.createMock(SourceConnector.class);
+        String errorMessage = "Wait I thought we tested for this?";
+        
EasyMock.expect(connectorMock.canDefineTransactionBoundaries(EasyMock.eq(config)))
+                .andThrow(new ConnectException(errorMessage));
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages();
+        assertFalse(errors.isEmpty());
+        assertTrue(
+                "Error message did not contain expected text: " + 
errors.get(0),
+                errors.get(0).contains(errorMessage));
+        assertEquals(1, errors.size());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void 
testConnectorTransactionBoundaryValidationHandlesInvalidValuesGracefully() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, 
"CONNECTOR.toString()");
+
+        SourceConnector connectorMock = 
PowerMock.createMock(SourceConnector.class);
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages();
+        assertFalse(errors.isEmpty());
+        assertTrue(
+                "Error message did not contain expected text: " + 
errors.get(0),
+                errors.get(0).contains("String must be one of (case 
insensitive): "));
+        assertEquals(1, errors.size());
+
+        PowerMock.verifyAll();
     }
 
     @Test
@@ -2851,4 +3094,14 @@ public class DistributedHerderTest {
     private abstract class BogusSourceTask extends SourceTask {
     }
 
+    private DistributedHerder exactlyOnceHerder() {
+        Map<String, String> config = new HashMap<>(HERDER_CONFIG);
+        config.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+        return PowerMock.createPartialMock(DistributedHerder.class,
+                new String[]{"connectorTypeForClass", 
"updateDeletedConnectorStatus", "updateDeletedTaskStatus", 
"validateConnectorConfig"},
+                new DistributedConfig(config), worker, WORKER_ID, 
KAFKA_CLUSTER_ID,
+                statusBackingStore, configBackingStore, member, MEMBER_URL, 
metrics, time, noneConnectorClientConfigOverridePolicy,
+                new AutoCloseable[0]);
+    }
+
 }

Reply via email to