This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8cfafba2794 KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector (#12366) 8cfafba2794 is described below commit 8cfafba2794562840b0f1c537e304f084b9359cf Author: Chris Egerton <chr...@aiven.io> AuthorDate: Mon Feb 13 10:09:14 2023 -0500 KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector (#12366) Reviewers: Mickael Maison <mickael.mai...@gmail.com> --- .../apache/kafka/common/config/AbstractConfig.java | 9 +- .../java/org/apache/kafka/common/utils/Utils.java | 38 +++++++ .../org/apache/kafka/common/utils/UtilsTest.java | 18 +++ connect/mirror/README.md | 42 +++++++ .../connect/mirror/MirrorConnectorConfig.java | 20 ++-- .../kafka/connect/mirror/MirrorMakerConfig.java | 8 +- .../connect/mirror/MirrorSourceConnector.java | 49 ++++++++ .../connect/mirror/MirrorSourceConnectorTest.java | 124 +++++++++++++++++++++ .../DedicatedMirrorIntegrationTest.java | 3 +- .../IdentityReplicationIntegrationTest.java | 4 +- .../MirrorConnectorsIntegrationBaseTest.java | 12 +- ...MirrorConnectorsIntegrationExactlyOnceTest.java | 52 +++++++++ .../runtime/distributed/DistributedConfig.java | 2 +- 13 files changed, 351 insertions(+), 30 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index e3fda4d9f54..e620f18f7d8 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -274,14 +274,7 @@ public class AbstractConfig { */ public Map<String, Object> originalsWithPrefix(String prefix, boolean strip) { Map<String, Object> result = new RecordingMap<>(prefix, false); - for (Map.Entry<String, ?> entry : originals.entrySet()) { - if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) { - if (strip) - result.put(entry.getKey().substring(prefix.length()), entry.getValue()); - else - result.put(entry.getKey(), entry.getValue()); - } - } + result.putAll(Utils.entriesWithPrefix(originals, prefix, strip)); return result; } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 4ac8d7d2fe1..0c3d6f15636 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1520,4 +1520,42 @@ public final class Utils { throw new IllegalArgumentException("Expected string to end with " + oldSuffix + " but string is " + str); return str.substring(0, str.length() - oldSuffix.length()) + newSuffix; } + + /** + * Find all key/value pairs whose keys begin with the given prefix, and remove that prefix from all + * resulting keys. + * @param map the map to filter key/value pairs from + * @param prefix the prefix to search keys for + * @return a {@link Map} containing a key/value pair for every key/value pair in the {@code map} + * parameter whose key begins with the given {@code prefix} and whose corresponding keys have + * the prefix stripped from them; may be empty, but never null + * @param <V> the type of values stored in the map + */ + public static <V> Map<String, V> entriesWithPrefix(Map<String, V> map, String prefix) { + return entriesWithPrefix(map, prefix, true); + } + + /** + * Find all key/value pairs whose keys begin with the given prefix, optionally removing that prefix + * from all resulting keys. + * @param map the map to filter key/value pairs from + * @param prefix the prefix to search keys for + * @param strip whether the keys of the returned map should not include the prefix + * @return a {@link Map} containing a key/value pair for every key/value pair in the {@code map} + * parameter whose key begins with the given {@code prefix}; may be empty, but never null + * @param <V> the type of values stored in the map + */ + public static <V> Map<String, V> entriesWithPrefix(Map<String, V> map, String prefix, boolean strip) { + Map<String, V> result = new HashMap<>(); + for (Map.Entry<String, V> entry : map.entrySet()) { + if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) { + if (strip) + result.put(entry.getKey().substring(prefix.length()), entry.getValue()); + else + result.put(entry.getKey(), entry.getValue()); + } + } + return result; + } + } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 2366da33662..bed5ed62dd6 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -44,6 +44,7 @@ import java.time.temporal.ChronoField; import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Properties; @@ -936,4 +937,21 @@ public class UtilsTest { assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt")); } + @Test + public void testEntriesWithPrefix() { + Map<String, Object> props = new HashMap<>(); + props.put("foo.bar", "abc"); + props.put("setting", "def"); + + // With stripping + Map<String, Object> expected = Collections.singletonMap("bar", "abc"); + Map<String, Object> actual = Utils.entriesWithPrefix(props, "foo."); + assertEquals(expected, actual); + + // Without stripping + expected = Collections.singletonMap("foo.bar", "abc"); + actual = Utils.entriesWithPrefix(props, "foo.", false); + assertEquals(expected, actual); + } + } diff --git a/connect/mirror/README.md b/connect/mirror/README.md index 3c8aebc635f..db016fd3742 100644 --- a/connect/mirror/README.md +++ b/connect/mirror/README.md @@ -194,6 +194,48 @@ it is important to keep configuration consistent across flows to the same target cluster. In most cases, your entire organization should use a single MM2 configuration file. +### Exactly-once +Exactly-once semantics are supported for dedicated MM2 clusters as of version 3.5.0. + +For new MM2 clusters, set the `exactly.once.source.support` property to `enabled` for +all targeted Kafka clusters that should be written to with exactly-once semantics. For example, +to enable exactly-once for writes to cluster `B`, add the following to your MM2 config file: + + B.exactly.once.source.support = enabled + +For existing MM2 clusters, a two-step upgrade is necessary. Instead of immediately +setting the `exactly.once.source.support` property to `enabled`, first set it to `preparing` on +all nodes in the cluster. Once this is complete, it can be set to `enabled` on all nodes in the +cluster, in a second round of restarts. + +In either case, it is also necessary to enable intra-cluster communication between your MM2 +nodes, as described in +[KIP-710](https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters). +To do this, the `dedicated.mode.enable.internal.rest` property must be set to `true` in your MM2 config. +In addition, many of the REST-related +[configuration properties available for Kafka Connect](https://kafka.apache.org/documentation/#connectconfigs) +can be specified in your MM2 config. For example, to enable intra-cluster communication in your MM2 +cluster with each node listening on port 8080 of their local machine, add this to your config file: + + dedicated.mode.enable.internal.rest = true + listeners = http://localhost:8080 + +**Note that, if intra-cluster communication is enabled in production environments, it is highly +recommended to secure the REST servers brought up by each MM2 node. See the configuration +properties for Kafka Connect for information on how this can be accomplished.** + +It is also recommended to filter records from aborted transactions out from replicated data +when running MM2. To do this, ensure that the consumer used to read from source clusters is +configured with `isolation.level` set to `read_committed`. If replicating data from cluster `A`, +this can be done for all replication flows that read from that cluster by adding the following +to your MM2 config: + + A.consumer.isolation.level = read_committed + +As a final note, under the hood, MM2 uses Kafka Connect source connectors to replicate data. +For more information on exactly-once support for these kinds of connectors, see the +[relevant docs page](https://kafka.apache.org/documentation/#connect_exactlyoncesource). + ## Remote topics MM2 employs a naming convention to ensure that records from different diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java index e65134d7416..6896cf74157 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java @@ -146,14 +146,18 @@ public abstract class MirrorConnectorConfig extends AbstractConfig { } Map<String, Object> sourceConsumerConfig() { - Map<String, Object> props = new HashMap<>(); - props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX)); - props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); - props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX)); - props.putAll(originalsWithPrefix(SOURCE_PREFIX + CONSUMER_CLIENT_PREFIX)); - props.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); - props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest"); - return props; + return sourceConsumerConfig(originals()); + } + + static Map<String, Object> sourceConsumerConfig(Map<String, ?> props) { + Map<String, Object> result = new HashMap<>(); + result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX)); + result.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); + result.putAll(Utils.entriesWithPrefix(props, CONSUMER_CLIENT_PREFIX)); + result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX + CONSUMER_CLIENT_PREFIX)); + result.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); + result.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest"); + return result; } Map<String, Object> targetAdminConfig() { diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java index 85f4e9d79e4..3acc6bf7624 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java @@ -300,15 +300,11 @@ public class MirrorMakerConfig extends AbstractConfig { } private Map<String, String> stringsWithPrefixStripped(String prefix) { - return originalsStrings().entrySet().stream() - .filter(x -> x.getKey().startsWith(prefix)) - .collect(Collectors.toMap(x -> x.getKey().substring(prefix.length()), Entry::getValue)); + return Utils.entriesWithPrefix(originalsStrings(), prefix); } private Map<String, String> stringsWithPrefix(String prefix) { - Map<String, String> strings = originalsStrings(); - strings.keySet().removeIf(x -> !x.startsWith(prefix)); - return strings; + return Utils.entriesWithPrefix(originalsStrings(), prefix, false); } static Map<String, String> clusterConfigsWithPrefix(String prefix, Map<String, String> props) { diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java index b49daf0ff76..0ecc69c0b37 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java @@ -16,10 +16,15 @@ */ package org.apache.kafka.connect.mirror; +import java.util.Locale; import java.util.Map.Entry; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.source.ExactlyOnceSupport; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigResource; @@ -47,6 +52,7 @@ import org.apache.kafka.clients.admin.CreateTopicsOptions; import java.util.Map; import java.util.List; import java.util.ArrayList; +import java.util.Objects; import java.util.Set; import java.util.HashSet; import java.util.Collection; @@ -69,6 +75,8 @@ public class MirrorSourceConnector extends SourceConnector { private static final ResourcePatternFilter ANY_TOPIC = new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.ANY); private static final AclBindingFilter ANY_TOPIC_ACL = new AclBindingFilter(ANY_TOPIC, AccessControlEntryFilter.ANY); + private static final String READ_COMMITTED = IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT); + private static final String EXACTLY_ONCE_SUPPORT_CONFIG = "exactly.once.support"; private Scheduler scheduler; private MirrorSourceConfig config; @@ -188,11 +196,52 @@ public class MirrorSourceConnector extends SourceConnector { return MirrorSourceConfig.CONNECTOR_CONFIG_DEF; } + @Override + public org.apache.kafka.common.config.Config validate(Map<String, String> props) { + List<ConfigValue> configValues = super.validate(props).configValues(); + if ("required".equals(props.get(EXACTLY_ONCE_SUPPORT_CONFIG))) { + if (!consumerUsesReadCommitted(props)) { + ConfigValue exactlyOnceSupport = configValues.stream() + .filter(cv -> EXACTLY_ONCE_SUPPORT_CONFIG.equals(cv.name())) + .findAny() + .orElseGet(() -> { + ConfigValue result = new ConfigValue(EXACTLY_ONCE_SUPPORT_CONFIG); + configValues.add(result); + return result; + }); + // The Connect framework will already generate an error for this property if we return ExactlyOnceSupport.UNSUPPORTED + // from our exactlyOnceSupport method, but it will be fairly generic + // We add a second error message here to give users more insight into why this specific connector can't support exactly-once + // guarantees with the given configuration + exactlyOnceSupport.addErrorMessage( + "MirrorSourceConnector can only provide exactly-once guarantees when its source consumer is configured with " + + ConsumerConfig.ISOLATION_LEVEL_CONFIG + " set to '" + READ_COMMITTED + "'; " + + "otherwise, records from aborted and uncommitted transactions will be replicated from the " + + "source cluster to the target cluster." + ); + } + } + return new org.apache.kafka.common.config.Config(configValues); + } + @Override public String version() { return AppInfoParser.getVersion(); } + @Override + public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) { + return consumerUsesReadCommitted(props) + ? ExactlyOnceSupport.SUPPORTED + : ExactlyOnceSupport.UNSUPPORTED; + } + + private boolean consumerUsesReadCommitted(Map<String, String> props) { + Object consumerIsolationLevel = MirrorSourceConfig.sourceConsumerConfig(props) + .get(ConsumerConfig.ISOLATION_LEVEL_CONFIG); + return Objects.equals(READ_COMMITTED, consumerIsolationLevel); + } + // visible for testing List<TopicPartition> findSourceTopicPartitions() throws InterruptedException, ExecutionException { diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java index 304b42d71c5..92e37e5fd15 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; @@ -29,8 +30,13 @@ import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.connect.source.ExactlyOnceSupport; import org.junit.jupiter.api.Test; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX; +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.SOURCE_PREFIX; +import static org.apache.kafka.connect.mirror.MirrorSourceConfig.OFFSET_LAG_MAX; import static org.apache.kafka.connect.mirror.MirrorSourceConfig.TASK_TOPIC_PARTITIONS; import static org.apache.kafka.connect.mirror.TestUtils.makeProps; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -55,6 +61,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; public class MirrorSourceConnectorTest { @@ -334,4 +342,120 @@ public class MirrorSourceConnectorTest { assertDoesNotThrow(() -> connector.isCycle(".b")); } + @Test + public void testExactlyOnceSupport() { + String readCommitted = "read_committed"; + String readUncommitted = "read_uncommitted"; + String readGarbage = "read_garbage"; + + // Connector is configured correctly, but exactly-once can't be supported + assertExactlyOnceSupport(null, null, false); + assertExactlyOnceSupport(readUncommitted, null, false); + assertExactlyOnceSupport(null, readUncommitted, false); + assertExactlyOnceSupport(readUncommitted, readUncommitted, false); + + // Connector is configured correctly, and exactly-once can be supported + assertExactlyOnceSupport(readCommitted, null, true); + assertExactlyOnceSupport(null, readCommitted, true); + assertExactlyOnceSupport(readUncommitted, readCommitted, true); + assertExactlyOnceSupport(readCommitted, readCommitted, true); + + // Connector is configured incorrectly, but is able to react gracefully + assertExactlyOnceSupport(readGarbage, null, false); + assertExactlyOnceSupport(null, readGarbage, false); + assertExactlyOnceSupport(readGarbage, readGarbage, false); + assertExactlyOnceSupport(readCommitted, readGarbage, false); + assertExactlyOnceSupport(readUncommitted, readGarbage, false); + assertExactlyOnceSupport(readGarbage, readUncommitted, false); + assertExactlyOnceSupport(readGarbage, readCommitted, true); + } + + private void assertExactlyOnceSupport(String defaultIsolationLevel, String sourceIsolationLevel, boolean expected) { + Map<String, String> props = makeProps(); + if (defaultIsolationLevel != null) { + props.put(CONSUMER_CLIENT_PREFIX + ISOLATION_LEVEL_CONFIG, defaultIsolationLevel); + } + if (sourceIsolationLevel != null) { + props.put(SOURCE_PREFIX + CONSUMER_CLIENT_PREFIX + ISOLATION_LEVEL_CONFIG, sourceIsolationLevel); + } + ExactlyOnceSupport expectedSupport = expected ? ExactlyOnceSupport.SUPPORTED : ExactlyOnceSupport.UNSUPPORTED; + ExactlyOnceSupport actualSupport = new MirrorSourceConnector().exactlyOnceSupport(props); + assertEquals(expectedSupport, actualSupport); + } + + @Test + public void testExactlyOnceSupportValidation() { + String exactlyOnceSupport = "exactly.once.support"; + + Map<String, String> props = makeProps(); + Optional<ConfigValue> configValue = validateProperty(exactlyOnceSupport, props); + assertEquals(Optional.empty(), configValue); + + props.put(exactlyOnceSupport, "requested"); + configValue = validateProperty(exactlyOnceSupport, props); + assertEquals(Optional.empty(), configValue); + + props.put(exactlyOnceSupport, "garbage"); + configValue = validateProperty(exactlyOnceSupport, props); + assertEquals(Optional.empty(), configValue); + + props.put(exactlyOnceSupport, "required"); + configValue = validateProperty(exactlyOnceSupport, props); + assertTrue(configValue.isPresent()); + List<String> errorMessages = configValue.get().errorMessages(); + assertEquals(1, errorMessages.size()); + String errorMessage = errorMessages.get(0); + assertTrue( + errorMessages.get(0).contains(ISOLATION_LEVEL_CONFIG), + "Error message \"" + errorMessage + "\" should have mentioned the 'isolation.level' consumer property" + ); + + props.put(CONSUMER_CLIENT_PREFIX + ISOLATION_LEVEL_CONFIG, "read_committed"); + configValue = validateProperty(exactlyOnceSupport, props); + assertEquals(Optional.empty(), configValue); + + // Make sure that an unrelated invalid property doesn't cause an exception to be thrown and is instead handled and reported gracefully + props.put(OFFSET_LAG_MAX, "bad"); + // Ensure that the issue with the invalid property is reported... + configValue = validateProperty(OFFSET_LAG_MAX, props); + assertTrue(configValue.isPresent()); + errorMessages = configValue.get().errorMessages(); + assertEquals(1, errorMessages.size()); + errorMessage = errorMessages.get(0); + assertTrue( + errorMessages.get(0).contains(OFFSET_LAG_MAX), + "Error message \"" + errorMessage + "\" should have mentioned the 'offset.lag.max' property" + ); + // ... and that it does not cause any issues with validation for exactly-once support... + configValue = validateProperty(exactlyOnceSupport, props); + assertEquals(Optional.empty(), configValue); + + // ... regardless of whether validation for exactly-once support does or does not find an error + props.remove(CONSUMER_CLIENT_PREFIX + ISOLATION_LEVEL_CONFIG); + configValue = validateProperty(exactlyOnceSupport, props); + assertTrue(configValue.isPresent()); + errorMessages = configValue.get().errorMessages(); + assertEquals(1, errorMessages.size()); + errorMessage = errorMessages.get(0); + assertTrue( + errorMessages.get(0).contains(ISOLATION_LEVEL_CONFIG), + "Error message \"" + errorMessage + "\" should have mentioned the 'isolation.level' consumer property" + ); + } + + private Optional<ConfigValue> validateProperty(String name, Map<String, String> props) { + List<ConfigValue> results = new MirrorSourceConnector().validate(props) + .configValues().stream() + .filter(cv -> name.equals(cv.name())) + .collect(Collectors.toList()); + + assertTrue(results.size() <= 1, "Connector produced multiple config values for '" + name + "' property"); + + if (results.isEmpty()) + return Optional.empty(); + + ConfigValue result = results.get(0); + assertNotNull(result, "Connector should not have record null config value for '" + name + "' property"); + return Optional.of(result); + } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java index f78de9bced6..e2db7b3865e 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java @@ -199,7 +199,8 @@ public class DedicatedMirrorIntegrationTest { // Enable exactly-once support to both validate that MirrorMaker can run with // that feature turned on, and to force cross-worker communication before // task startup - put(a + ".exactly.once.source.support", "enabled"); + put(b + ".exactly.once.source.support", "enabled"); + put(a + ".consumer.isolation.level", "read_committed"); put(ab + ".enabled", "true"); put(ab + ".topics", "^" + testTopicPrefix + ".*"); // The name of the offset syncs topic will contain the name of the cluster in diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java index 1a961be564c..dd683f1acfe 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java @@ -225,7 +225,7 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat consumerProps, "test-topic-1"); waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("test-topic-1"), - consumerGroupName, NUM_RECORDS_PRODUCED); + consumerGroupName, NUM_RECORDS_PRODUCED, true); ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); @@ -254,7 +254,7 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat "group.id", consumerGroupName), "test-topic-1", "test-topic-2"); waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("test-topic-1", "test-topic-2"), - consumerGroupName, NUM_RECORDS_PRODUCED); + consumerGroupName, NUM_RECORDS_PRODUCED, true); records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); // similar reasoning as above, no more records to consume by the same consumer group at backup cluster diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 5ca9b110707..27da7054b67 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -118,6 +118,7 @@ public class MirrorConnectorsIntegrationBaseTest { protected Properties backupBrokerProps = new Properties(); protected Map<String, String> primaryWorkerProps = new HashMap<>(); protected Map<String, String> backupWorkerProps = new HashMap<>(); + protected boolean exactOffsetTranslation = true; @BeforeEach public void startClusters() throws Exception { @@ -450,7 +451,7 @@ public class MirrorConnectorsIntegrationBaseTest { consumerProps, "primary.test-topic-1"); waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("primary.test-topic-1"), - consumerGroupName, NUM_RECORDS_PRODUCED); + consumerGroupName, NUM_RECORDS_PRODUCED, exactOffsetTranslation); ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); @@ -479,7 +480,7 @@ public class MirrorConnectorsIntegrationBaseTest { "group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2"); waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), - consumerGroupName, NUM_RECORDS_PRODUCED); + consumerGroupName, NUM_RECORDS_PRODUCED, exactOffsetTranslation); records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); // similar reasoning as above, no more records to consume by the same consumer group at backup cluster @@ -708,7 +709,7 @@ public class MirrorConnectorsIntegrationBaseTest { * offsets are eventually synced to the expected offset numbers */ protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect, - Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords) + Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation) throws InterruptedException { try (Admin adminClient = connect.kafka().createAdminClient()) { List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size()); @@ -728,8 +729,11 @@ public class MirrorConnectorsIntegrationBaseTest { Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS); long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum(); + boolean totalOffsetsMatch = exactOffsetTranslation + ? totalOffsets == expectedTotalOffsets + : totalOffsets >= expectedTotalOffsets; // make sure the consumer group offsets are synced to expected number - return totalOffsets == expectedTotalOffsets && consumerGroupOffsetTotal > 0; + return totalOffsetsMatch && consumerGroupOffsetTotal > 0; }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java new file mode 100644 index 00000000000..a50b21bd58b --- /dev/null +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; + +import java.util.Arrays; +import java.util.Properties; + +/** + * Tests MM2 replication with exactly-once support enabled on the Connect clusters. + */ +@Tag("integration") +public class MirrorConnectorsIntegrationExactlyOnceTest extends MirrorConnectorsIntegrationBaseTest { + + @BeforeEach + public void startClusters() throws Exception { + mm2Props.put( + PRIMARY_CLUSTER_ALIAS + "." + DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, + DistributedConfig.ExactlyOnceSourceSupport.ENABLED.toString() + ); + mm2Props.put( + BACKUP_CLUSTER_ALIAS + "." + DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, + DistributedConfig.ExactlyOnceSourceSupport.ENABLED.toString() + ); + for (Properties brokerProps : Arrays.asList(primaryBrokerProps, backupBrokerProps)) { + brokerProps.put("transaction.state.log.replication.factor", "1"); + brokerProps.put("transaction.state.log.min.isr", "1"); + } + // Transaction marker records will cause translated offsets to not match + // between source and target + exactOffsetTranslation = false; + super.startClusters(); + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java index 9cacb6e1ee6..1c0ebea2866 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java @@ -212,7 +212,7 @@ public class DistributedConfig extends WorkerConfig { + "on other JVMs, no default is used and a value for this property must be manually specified in the worker config."; private Crypto crypto; - private enum ExactlyOnceSourceSupport { + public enum ExactlyOnceSourceSupport { DISABLED(false), PREPARING(true), ENABLED(true);