This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7268284699f KAFKA-10000: Add all public-facing config properties 
(#11775)
7268284699f is described below

commit 7268284699f84c7f61f5656167c36877b72d27f2
Author: Chris Egerton <[email protected]>
AuthorDate: Thu May 12 02:45:53 2022 -0400

    KAFKA-10000: Add all public-facing config properties (#11775)
    
    Reviewers: Luke Chen <[email protected]>, Tom Bentley 
<[email protected]>, Andrew Eugene Choi <[email protected]>
---
 .../kafka/connect/source/SourceConnector.java      |   5 +-
 .../apache/kafka/connect/source/SourceTask.java    |   2 +
 .../connect/runtime/SourceConnectorConfig.java     | 168 +++++++++++++++++++--
 .../apache/kafka/connect/runtime/WorkerConfig.java |  60 +++++++-
 .../runtime/distributed/DistributedConfig.java     |  81 ++++++++++
 .../kafka/connect/runtime/AbstractHerderTest.java  |  34 +++--
 .../runtime/distributed/DistributedConfigTest.java |  43 +++++-
 7 files changed, 365 insertions(+), 28 deletions(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
index 7fc2a5d11cf..3bd012f9fbc 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
@@ -63,9 +63,8 @@ public abstract class SourceConnector extends Connector {
      *
      * @param connectorConfig the configuration that will be used for the 
connector
      * @return {@link ConnectorTransactionBoundaries#SUPPORTED} if the 
connector will define its own transaction boundaries,
-     * or {@link ConnectorTransactionBoundaries#UNSUPPORTED} otherwise. If 
this method is overridden by a
-     * connector, should not be {@code null}, but if {@code null}, it will be 
assumed that the connector cannot define its own
-     * transaction boundaries.
+     * or {@link ConnectorTransactionBoundaries#UNSUPPORTED} otherwise; may 
never be {@code null}. The default implementation
+     * returns {@link ConnectorTransactionBoundaries#UNSUPPORTED}.
      * @since 3.3
      * @see TransactionContext
      */
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java 
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
index 2159e68e8a8..559f02340ca 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.connector.Task;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
@@ -64,6 +65,7 @@ public abstract class SourceTask implements Task {
          * @throws IllegalArgumentException if there is no transaction 
boundary type with the given name
          */
         public static TransactionBoundary fromProperty(String property) {
+            Objects.requireNonNull(property, "Value for transaction boundary 
property may not be null");
             return 
TransactionBoundary.valueOf(property.toUpperCase(Locale.ROOT).trim());
         }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index e38072b9b6e..65d376b08f7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -20,21 +20,31 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.source.SourceTask;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUESTED;
+import static 
org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary;
+import static 
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR;
+import static 
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.DEFAULT;
+import static 
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL;
+import static 
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL;
+import static org.apache.kafka.common.utils.Utils.enumOptions;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
@@ -47,6 +57,57 @@ public class SourceConnectorConfig extends ConnectorConfig {
             + "created by source connectors";
     private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic 
Creation Groups";
 
+    protected static final String EXACTLY_ONCE_SUPPORT_GROUP = "Exactly Once 
Support";
+
+    public enum ExactlyOnceSupportLevel {
+        REQUESTED,
+        REQUIRED;
+
+        public static ExactlyOnceSupportLevel fromProperty(String property) {
+            return valueOf(property.toUpperCase(Locale.ROOT).trim());
+        }
+
+        @Override
+        public String toString() {
+            return name().toLowerCase(Locale.ROOT);
+        }
+    }
+
+    public static final String EXACTLY_ONCE_SUPPORT_CONFIG = 
"exactly.once.support";
+    private static final String EXACTLY_ONCE_SUPPORT_DOC = "Permitted values 
are " + String.join(", ", enumOptions(ExactlyOnceSupportLevel.class)) + ". "
+            + "If set to \"" + REQUIRED + "\", forces a preflight check for 
the connector to ensure that it can provide exactly-once delivery "
+            + "with the given configuration. Some connectors may be capable of 
providing exactly-once delivery but not signal to "
+            + "Connect that they support this; in that case, documentation for 
the connector should be consulted carefully before "
+            + "creating it, and the value for this property should be set to 
\"" + REQUESTED + "\". "
+            + "Additionally, if the value is set to \"" + REQUIRED + "\" but 
the worker that performs preflight validation does not have "
+            + "exactly-once support enabled for source connectors, requests to 
create or validate the connector will fail.";
+    private static final String EXACTLY_ONCE_SUPPORT_DISPLAY = "Exactly once 
support";
+
+    public static final String TRANSACTION_BOUNDARY_CONFIG = 
SourceTask.TRANSACTION_BOUNDARY_CONFIG;
+    private static final String TRANSACTION_BOUNDARY_DOC = "Permitted values 
are: " + String.join(", ", enumOptions(TransactionBoundary.class)) + ". "
+            + "If set to '" + POLL + "', a new producer transaction will be 
started and committed for every batch of records that each task from "
+            + "this connector provides to Connect. If set to '" + CONNECTOR + 
"', relies on connector-defined transaction boundaries; note that "
+            + "not all connectors are capable of defining their own 
transaction boundaries, and in that case, attempts to instantiate a connector 
with "
+            + "this value will fail. Finally, if set to '" + INTERVAL + "', 
commits transactions only after a user-defined time interval has passed.";
+    private static final String TRANSACTION_BOUNDARY_DISPLAY = "Transaction 
Boundary";
+
+    public static final String TRANSACTION_BOUNDARY_INTERVAL_CONFIG = 
"transaction.boundary.interval.ms";
+    private static final String TRANSACTION_BOUNDARY_INTERVAL_DOC = "If '" + 
TRANSACTION_BOUNDARY_CONFIG + "' is set to '" + INTERVAL
+            + "', determines the interval for producer transaction commits by 
connector tasks. If unset, defaults to the value of the worker-level "
+            + "'" + WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG + "' 
property. It has no effect if a different "
+            + TRANSACTION_BOUNDARY_CONFIG + " is specified.";
+    private static final String TRANSACTION_BOUNDARY_INTERVAL_DISPLAY = 
"Transaction boundary interval";
+
+    protected static final String OFFSETS_TOPIC_GROUP = "offsets.topic";
+
+    public static final String OFFSETS_TOPIC_CONFIG = "offsets.storage.topic";
+    private static final String OFFSETS_TOPIC_DOC = "The name of a separate 
offsets topic to use for this connector. "
+            + "If empty or not specified, the worker’s global offsets topic 
name will be used. "
+            + "If specified, the offsets topic will be created if it does not 
already exist on the Kafka cluster targeted by this connector "
+            + "(which may be different from the one used for the worker's 
global offsets topic if the bootstrap.servers property of the connector's 
producer "
+            + "has been overridden from the worker's). Only applicable in 
distributed mode; in standalone mode, setting this property will have no 
effect.";
+    private static final String OFFSETS_TOPIC_DISPLAY = "Offsets topic";
+
     private static class EnrichedSourceConnectorConfig extends ConnectorConfig 
{
         EnrichedSourceConnectorConfig(Plugins plugins, ConfigDef configDef, 
Map<String, String> props) {
             super(plugins, configDef, props);
@@ -58,23 +119,87 @@ public class SourceConnectorConfig extends ConnectorConfig 
{
         }
     }
 
-    private static final ConfigDef CONFIG = SourceConnectorConfig.configDef();
+    private final TransactionBoundary transactionBoundary;
+    private final Long transactionBoundaryInterval;
     private final EnrichedSourceConnectorConfig enrichedSourceConfig;
+    private final String offsetsTopic;
 
     public static ConfigDef configDef() {
+        ConfigDef.Validator atLeastZero = ConfigDef.Range.atLeast(0);
         int orderInGroup = 0;
         return new ConfigDef(ConnectorConfig.configDef())
-                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
-                        ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                .define(
+                        TOPIC_CREATION_GROUPS_CONFIG,
+                        ConfigDef.Type.LIST,
+                        Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(
+                                new ConfigDef.NonNullValidator(),
+                                ConfigDef.LambdaValidator.with(
+                                    (name, value) -> {
+                                        List<?> groupAliases = (List<?>) value;
+                                        if (groupAliases.size() > new 
HashSet<>(groupAliases).size()) {
+                                            throw new ConfigException(name, 
value, "Duplicate alias provided.");
+                                        }
+                                    },
+                                    () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW,
+                        TOPIC_CREATION_GROUPS_DOC,
+                        TOPIC_CREATION_GROUP,
+                        ++orderInGroup,
+                        ConfigDef.Width.LONG,
+                        TOPIC_CREATION_GROUPS_DISPLAY)
+                .define(
+                        EXACTLY_ONCE_SUPPORT_CONFIG,
+                        ConfigDef.Type.STRING,
+                        REQUESTED.toString(),
+                        
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSupportLevel.class)),
+                        ConfigDef.Importance.MEDIUM,
+                        EXACTLY_ONCE_SUPPORT_DOC,
+                        EXACTLY_ONCE_SUPPORT_GROUP,
+                        ++orderInGroup,
+                        ConfigDef.Width.SHORT,
+                        EXACTLY_ONCE_SUPPORT_DISPLAY)
+                .define(
+                        TRANSACTION_BOUNDARY_CONFIG,
+                        ConfigDef.Type.STRING,
+                        DEFAULT.toString(),
+                        
ConfigDef.CaseInsensitiveValidString.in(enumOptions(TransactionBoundary.class)),
+                        ConfigDef.Importance.MEDIUM,
+                        TRANSACTION_BOUNDARY_DOC,
+                        EXACTLY_ONCE_SUPPORT_GROUP,
+                        ++orderInGroup,
+                        ConfigDef.Width.SHORT,
+                        TRANSACTION_BOUNDARY_DISPLAY)
+                .define(
+                        TRANSACTION_BOUNDARY_INTERVAL_CONFIG,
+                        ConfigDef.Type.LONG,
+                        null,
+                        ConfigDef.LambdaValidator.with(
                             (name, value) -> {
-                                List<?> groupAliases = (List<?>) value;
-                                if (groupAliases.size() > new 
HashSet<>(groupAliases).size()) {
-                                    throw new ConfigException(name, value, 
"Duplicate alias provided.");
+                                if (value == null) {
+                                    return;
                                 }
+                                atLeastZero.ensureValid(name, value);
                             },
-                            () -> "unique topic creation groups")),
-                        ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, 
TOPIC_CREATION_GROUP,
-                        ++orderInGroup, ConfigDef.Width.LONG, 
TOPIC_CREATION_GROUPS_DISPLAY);
+                            atLeastZero::toString
+                        ),
+                        ConfigDef.Importance.LOW,
+                        TRANSACTION_BOUNDARY_INTERVAL_DOC,
+                        EXACTLY_ONCE_SUPPORT_GROUP,
+                        ++orderInGroup,
+                        ConfigDef.Width.SHORT,
+                        TRANSACTION_BOUNDARY_INTERVAL_DISPLAY)
+                .define(
+                        OFFSETS_TOPIC_CONFIG,
+                        ConfigDef.Type.STRING,
+                        null,
+                        new ConfigDef.NonEmptyString(),
+                        ConfigDef.Importance.LOW,
+                        OFFSETS_TOPIC_DOC,
+                        OFFSETS_TOPIC_GROUP,
+                        orderInGroup = 1,
+                        ConfigDef.Width.LONG,
+                        OFFSETS_TOPIC_DISPLAY);
     }
 
     public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
@@ -116,9 +241,9 @@ public class SourceConnectorConfig extends ConnectorConfig {
     }
 
     public SourceConnectorConfig(Plugins plugins, Map<String, String> props, 
boolean createTopics) {
-        super(plugins, CONFIG, props);
+        super(plugins, configDef(), props);
         if (createTopics && props.entrySet().stream().anyMatch(e -> 
e.getKey().startsWith(TOPIC_CREATION_PREFIX))) {
-            ConfigDef defaultConfigDef = embedDefaultGroup(CONFIG);
+            ConfigDef defaultConfigDef = embedDefaultGroup(configDef());
             // This config is only used to set default values for partitions 
and replication
             // factor from the default group and otherwise it remains unused
             AbstractConfig defaultGroup = new AbstractConfig(defaultConfigDef, 
props, false);
@@ -135,6 +260,13 @@ public class SourceConnectorConfig extends ConnectorConfig 
{
         } else {
             enrichedSourceConfig = null;
         }
+        transactionBoundary = 
TransactionBoundary.fromProperty(getString(TRANSACTION_BOUNDARY_CONFIG));
+        transactionBoundaryInterval = 
getLong(TRANSACTION_BOUNDARY_INTERVAL_CONFIG);
+        offsetsTopic = getString(OFFSETS_TOPIC_CONFIG);
+    }
+
+    public static boolean usesTopicCreation(Map<String, String> props) {
+        return props.entrySet().stream().anyMatch(e -> 
e.getKey().startsWith(TOPIC_CREATION_PREFIX));
     }
 
     @Override
@@ -142,6 +274,18 @@ public class SourceConnectorConfig extends ConnectorConfig 
{
         return enrichedSourceConfig != null ? enrichedSourceConfig.get(key) : 
super.get(key);
     }
 
+    public TransactionBoundary transactionBoundary() {
+        return transactionBoundary;
+    }
+
+    public Long transactionBoundaryInterval() {
+        return transactionBoundaryInterval;
+    }
+
+    public String offsetsTopic() {
+        return offsetsTopic;
+    }
+
     /**
      * Returns whether this configuration uses topic creation properties.
      *
@@ -181,6 +325,6 @@ public class SourceConnectorConfig extends ConnectorConfig {
     }
 
     public static void main(String[] args) {
-        System.out.println(CONFIG.toHtml(4, config -> 
"sourceconnectorconfigs_" + config));
+        System.out.println(configDef().toHtml(4, config -> 
"sourceconnectorconfigs_" + config));
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 5894283c935..38dbeb87e1b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -111,7 +111,8 @@ public class WorkerConfig extends AbstractConfig {
     private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC
             = "Maximum number of milliseconds to wait for records to flush and 
partition offset data to be"
             + " committed to offset storage before cancelling the process and 
restoring the offset "
-            + "data to be committed in a future attempt.";
+            + "data to be committed in a future attempt. This property has no 
effect for source connectors "
+            + "running with exactly-once support.";
     public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
 
     public static final String LISTENERS_CONFIG = "listeners";
@@ -343,6 +344,15 @@ public class WorkerConfig extends AbstractConfig {
         }
     }
 
+    /**
+     * @return the {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG 
bootstrap servers} property
+     * used by the worker when instantiating Kafka clients for connectors and 
tasks (unless overridden)
+     * and its internal topics (if running in distributed mode)
+     */
+    public String bootstrapServers() {
+        return String.join(",", getList(BOOTSTRAP_SERVERS_CONFIG));
+    }
+
     public Integer getRebalanceTimeout() {
         return null;
     }
@@ -351,6 +361,54 @@ public class WorkerConfig extends AbstractConfig {
         return getBoolean(TOPIC_CREATION_ENABLE_CONFIG);
     }
 
+    /**
+     * Whether this worker is configured with exactly-once support for source 
connectors.
+     * The default implementation returns {@code false} and should be 
overridden by subclasses
+     * if the worker mode for the subclass provides exactly-once support for 
source connectors.
+     * @return whether exactly-once support is enabled for source connectors 
on this worker
+     */
+    public boolean exactlyOnceSourceEnabled() {
+        return false;
+    }
+
+    /**
+     * Get the internal topic used by this worker to store source connector 
offsets.
+     * The default implementation returns {@code null} and should be 
overridden by subclasses
+     * if the worker mode for the subclass uses an internal offsets topic.
+     * @return the name of the internal offsets topic, or {@code null} if the 
worker does not use
+     * an internal offsets topic
+     */
+    public String offsetsTopic() {
+        return null;
+    }
+
+    /**
+     * Determine whether this worker supports per-connector source offsets 
topics.
+     * The default implementation returns {@code false} and should be 
overridden by subclasses
+     * if the worker mode for the subclass supports per-connector offsets 
topics.
+     * @return whether the worker supports per-connector offsets topics
+     */
+    public boolean connectorOffsetsTopicsPermitted() {
+        return false;
+    }
+
+    /**
+     * @return the offset commit interval for tasks created by this worker
+     */
+    public long offsetCommitInterval() {
+        return getLong(OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+    }
+
+    /**
+     * Get the {@link CommonClientConfigs#GROUP_ID_CONFIG group ID} used by 
this worker to form a cluster.
+     * The default implementation returns {@code null} and should be 
overridden by subclasses
+     * if the worker mode for the subclass is capable of forming a cluster 
using Kafka's group coordination API.
+     * @return the group ID for the worker's cluster, or {@code null} if the 
worker is not capable of forming a cluster.
+     */
+    public String groupId() {
+        return null;
+    }
+
     @Override
     protected Map<String, Object> postProcessParsedConfig(final Map<String, 
Object> parsedValues) {
         return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, 
parsedValues);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 23083f5e8e0..849a5969468 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
@@ -32,6 +33,7 @@ import java.security.InvalidParameterException;
 import java.security.NoSuchAlgorithmException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -39,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static org.apache.kafka.common.utils.Utils.enumOptions;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_VALIDATOR;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR;
 
@@ -194,6 +197,34 @@ public class DistributedConfig extends WorkerConfig {
     public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A 
list of permitted algorithms for verifying internal requests";
     public static final List<String> 
INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = 
Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
 
+    private enum ExactlyOnceSourceSupport {
+        DISABLED(false),
+        PREPARING(true),
+        ENABLED(true);
+
+        public final boolean usesTransactionalLeader;
+
+        ExactlyOnceSourceSupport(boolean usesTransactionalLeader) {
+            this.usesTransactionalLeader = usesTransactionalLeader;
+        }
+
+        public static ExactlyOnceSourceSupport fromProperty(String property) {
+            return 
ExactlyOnceSourceSupport.valueOf(property.toUpperCase(Locale.ROOT));
+        }
+
+        @Override
+        public String toString() {
+            return name().toLowerCase(Locale.ROOT);
+        }
+    }
+
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = 
"exactly.once.source.support";
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to 
enable exactly-once support for source connectors in the cluster "
+            + "by using transactions to write source records and their source 
offsets, and by proactively fencing out old task generations before bringing up 
new ones. ";
+            // TODO: https://issues.apache.org/jira/browse/KAFKA-13709
+            //       + "See the exactly-once source support documentation at 
[add docs link here] for more information on this feature.";
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT = 
ExactlyOnceSourceSupport.DISABLED.toString();
+
     @SuppressWarnings("unchecked")
     private static final ConfigDef CONFIG = baseConfigDef()
             .define(GROUP_ID_CONFIG,
@@ -215,6 +246,12 @@ public class DistributedConfig extends WorkerConfig {
                     Math.toIntExact(TimeUnit.SECONDS.toMillis(3)),
                     ConfigDef.Importance.HIGH,
                     HEARTBEAT_INTERVAL_MS_DOC)
+            .define(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
+                    ConfigDef.Type.STRING,
+                    EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT,
+                    
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSourceSupport.class)),
+                    ConfigDef.Importance.HIGH,
+                    EXACTLY_ONCE_SOURCE_SUPPORT_DOC)
             .define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
                     ConfigDef.Type.LONG,
                     TimeUnit.MINUTES.toMillis(5),
@@ -399,13 +436,57 @@ public class DistributedConfig extends WorkerConfig {
                     ConfigDef.Importance.LOW,
                     INTER_WORKER_VERIFICATION_ALGORITHMS_DOC);
 
+    private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
+
     @Override
     public Integer getRebalanceTimeout() {
         return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
     }
 
+    @Override
+    public boolean exactlyOnceSourceEnabled() {
+        return exactlyOnceSourceSupport == ExactlyOnceSourceSupport.ENABLED;
+    }
+
+    /**
+     * @return whether the Connect cluster's leader should use a transactional 
producer to perform writes to the config
+     * topic, which is useful for ensuring that zombie leaders are fenced out 
and unable to write to the topic after a
+     * new leader has been elected.
+     */
+    public boolean transactionalLeaderEnabled() {
+        return exactlyOnceSourceSupport.usesTransactionalLeader;
+    }
+
+    /**
+     * @return the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG transactional 
ID} to use for the worker's producer if
+     * using a transactional producer for writes to internal topics such as 
the config topic.
+     */
+    public String transactionalProducerId() {
+        return transactionalProducerId(groupId());
+    }
+
+    public static String transactionalProducerId(String groupId) {
+        return "connect-cluster-" + groupId;
+    }
+
+    @Override
+    public String offsetsTopic() {
+        return getString(OFFSET_STORAGE_TOPIC_CONFIG);
+    }
+
+    @Override
+    public boolean connectorOffsetsTopicsPermitted() {
+        return true;
+    }
+
+    @Override
+    public String groupId() {
+        return getString(GROUP_ID_CONFIG);
+    }
+
     public DistributedConfig(Map<String, String> props) {
         super(CONFIG, props);
+        exactlyOnceSourceSupport = 
ExactlyOnceSourceSupport.fromProperty(getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG));
         getInternalRequestKeyGenerator(); // Check here for a valid key size + 
key algorithm to fail fast if either are invalid
         validateKeyAlgorithmAndVerificationAlgorithms();
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 5b9e199e5a1..862f31c0770 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -53,6 +53,7 @@ import org.easymock.EasyMock;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
 import org.powermock.api.easymock.annotation.MockStrict;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -142,10 +143,10 @@ public class AbstractHerderTest {
 
     @MockStrict private Worker worker;
     @MockStrict private WorkerConfigTransformer transformer;
-    @MockStrict private Plugins plugins;
-    @MockStrict private ClassLoader classLoader;
     @MockStrict private ConfigBackingStore configStore;
     @MockStrict private StatusBackingStore statusStore;
+    @MockStrict private ClassLoader classLoader;
+    @Mock private Plugins plugins;
 
     @Test
     public void testConnectors() {
@@ -436,13 +437,18 @@ public class AbstractHerderTest {
         // We expect there to be errors due to the missing name and .... Note 
that these assertions depend heavily on
         // the config fields for SourceConnectorConfig, but we expect these to 
change rarely.
         assertEquals(SampleSourceConnector.class.getName(), result.name());
-        assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, 
ConnectorConfig.TRANSFORMS_GROUP,
-                ConnectorConfig.PREDICATES_GROUP, ConnectorConfig.ERROR_GROUP, 
SourceConnectorConfig.TOPIC_CREATION_GROUP), result.groups());
+        assertEquals(
+                Arrays.asList(
+                        ConnectorConfig.COMMON_GROUP, 
ConnectorConfig.TRANSFORMS_GROUP,
+                        ConnectorConfig.PREDICATES_GROUP, 
ConnectorConfig.ERROR_GROUP,
+                        SourceConnectorConfig.TOPIC_CREATION_GROUP, 
SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
+                        SourceConnectorConfig.OFFSETS_TOPIC_GROUP),
+                result.groups());
         assertEquals(2, result.errorCount());
         Map<String, ConfigInfo> infos = result.values().stream()
                 .collect(Collectors.toMap(info -> info.configKey().name(), 
Function.identity()));
-        // Base connector config has 14 fields, connector's configs add 2
-        assertEquals(17, infos.size());
+        // Base connector config has 14 fields, connector's configs add 7
+        assertEquals(21, infos.size());
         // Missing name should generate an error
         assertEquals(ConnectorConfig.NAME_CONFIG,
                 infos.get(ConnectorConfig.NAME_CONFIG).configValue().name());
@@ -531,6 +537,8 @@ public class AbstractHerderTest {
                 ConnectorConfig.PREDICATES_GROUP,
                 ConnectorConfig.ERROR_GROUP,
                 SourceConnectorConfig.TOPIC_CREATION_GROUP,
+                SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
+                SourceConnectorConfig.OFFSETS_TOPIC_GROUP,
                 "Transforms: xformA",
                 "Transforms: xformB"
         );
@@ -538,7 +546,7 @@ public class AbstractHerderTest {
         assertEquals(2, result.errorCount());
         Map<String, ConfigInfo> infos = result.values().stream()
                 .collect(Collectors.toMap(info -> info.configKey().name(), 
Function.identity()));
-        assertEquals(22, infos.size());
+        assertEquals(26, infos.size());
         // Should get 2 type fields from the transforms, first adds its own 
config since it has a valid class
         assertEquals("transforms.xformA.type",
                 infos.get("transforms.xformA.type").configValue().name());
@@ -590,6 +598,8 @@ public class AbstractHerderTest {
                 ConnectorConfig.PREDICATES_GROUP,
                 ConnectorConfig.ERROR_GROUP,
                 SourceConnectorConfig.TOPIC_CREATION_GROUP,
+                SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
+                SourceConnectorConfig.OFFSETS_TOPIC_GROUP,
                 "Transforms: xformA",
                 "Predicates: predX",
                 "Predicates: predY"
@@ -598,7 +608,7 @@ public class AbstractHerderTest {
         assertEquals(2, result.errorCount());
         Map<String, ConfigInfo> infos = result.values().stream()
                 .collect(Collectors.toMap(info -> info.configKey().name(), 
Function.identity()));
-        assertEquals(24, infos.size());
+        assertEquals(28, infos.size());
         // Should get 2 type fields from the transforms, first adds its own 
config since it has a valid class
         assertEquals("transforms.xformA.type",
                 infos.get("transforms.xformA.type").configValue().name());
@@ -659,12 +669,14 @@ public class AbstractHerderTest {
             ConnectorConfig.TRANSFORMS_GROUP,
             ConnectorConfig.PREDICATES_GROUP,
             ConnectorConfig.ERROR_GROUP,
-            SourceConnectorConfig.TOPIC_CREATION_GROUP
+            SourceConnectorConfig.TOPIC_CREATION_GROUP,
+            SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
+            SourceConnectorConfig.OFFSETS_TOPIC_GROUP
         );
         assertEquals(expectedGroups, result.groups());
         assertEquals(1, result.errorCount());
-        // Base connector config has 14 fields, connector's configs add 2, and 
2 producer overrides
-        assertEquals(19, result.values().size());
+        // Base connector config has 14 fields, connector's configs add 7, and 
2 producer overrides
+        assertEquals(23, result.values().size());
         assertTrue(result.values().stream().anyMatch(
             configInfo -> ackConfigKey.equals(configInfo.configValue().name()) 
&& !configInfo.configValue().errors().isEmpty()));
         assertTrue(result.values().stream().anyMatch(
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
index 21364ae8e17..12085b21d96 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
@@ -28,11 +28,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
+import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.Assert.assertTrue;
 
 public class DistributedConfigTest {
 
@@ -306,4 +309,42 @@ public class DistributedConfigTest {
                 () -> new DistributedConfig(configs));
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
+
+    @Test
+    public void shouldIdentifyNeedForTransactionalLeader() {
+        Map<String, String> workerProps = configs();
+
+        workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled");
+        assertFalse(new 
DistributedConfig(workerProps).transactionalLeaderEnabled());
+
+        workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+        assertTrue(new 
DistributedConfig(workerProps).transactionalLeaderEnabled());
+
+        workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+        assertTrue(new 
DistributedConfig(workerProps).transactionalLeaderEnabled());
+    }
+
+    @Test
+    public void shouldConstructExpectedTransactionalId() {
+        Map<String, String> workerProps = configs();
+
+        workerProps.put(GROUP_ID_CONFIG, "why did i stay up all night writing 
unit tests");
+        assertEquals(
+                "connect-cluster-why did i stay up all night writing unit 
tests",
+                new DistributedConfig(workerProps).transactionalProducerId()
+        );
+
+        workerProps.put(GROUP_ID_CONFIG, "connect-cluster");
+        assertEquals(
+                "connect-cluster-connect-cluster",
+                new DistributedConfig(workerProps).transactionalProducerId()
+        );
+
+        workerProps.put(GROUP_ID_CONFIG, "\u2603");
+        assertEquals(
+                "connect-cluster-\u2603",
+                new DistributedConfig(workerProps).transactionalProducerId()
+        );
+    }
+
 }

Reply via email to