This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 69a2817a0592f02f219909856316e26a4efd21f0 Author: Greg Harris <[email protected]> AuthorDate: Fri Feb 17 14:25:17 2023 -0800 KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag, syncing stale offsets, and flaky integration tests (#13178) KAFKA-12468: Fix negative lag on down consumer groups synced by MirrorMaker 2 KAFKA-13659: Stop syncing consumer groups with stale offsets in MirrorMaker 2 KAFKA-12566: Fix flaky MirrorMaker 2 integration tests Reviewers: Chris Egerton <[email protected]> --- .../kafka/connect/mirror/MirrorCheckpointTask.java | 17 +- .../kafka/connect/mirror/OffsetSyncStore.java | 116 ++++++--- .../connect/mirror/MirrorCheckpointTaskTest.java | 32 ++- .../kafka/connect/mirror/OffsetSyncStoreTest.java | 86 +++++-- .../IdentityReplicationIntegrationTest.java | 45 ++-- .../MirrorConnectorsIntegrationBaseTest.java | 259 ++++++++++++++++----- ...irrorConnectorsIntegrationTransactionsTest.java | 66 ++++++ .../apache/kafka/connect/util/KafkaBasedLog.java | 24 +- .../org/apache/kafka/connect/util/TopicAdmin.java | 3 +- 9 files changed, 497 insertions(+), 151 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index 29483c126b3..02dcdf0f43e 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -101,10 +101,13 @@ public class MirrorCheckpointTask extends SourceTask { idleConsumerGroupsOffset = new HashMap<>(); checkpointsPerConsumerGroup = new HashMap<>(); scheduler = new Scheduler(MirrorCheckpointTask.class, config.adminTimeout()); - scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(), - "refreshing idle consumers group offsets at target cluster"); - scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(), - "sync idle consumer group offset from source to target"); + scheduler.execute(() -> { + offsetSyncStore.start(); + scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(), + "refreshing idle consumers group offsets at target cluster"); + scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(), + "sync idle consumer group offset from source to target"); + }, "starting offset sync store"); } @Override @@ -135,7 +138,11 @@ public class MirrorCheckpointTask extends SourceTask { try { long deadline = System.currentTimeMillis() + interval.toMillis(); while (!stopping && System.currentTimeMillis() < deadline) { - offsetSyncStore.update(pollTimeout); + Thread.sleep(pollTimeout.toMillis()); + } + if (stopping) { + // we are stopping, return early. + return null; } List<SourceRecord> records = new ArrayList<>(); for (String group : consumerGroups) { diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java index f9b6617c13d..52bad401e8a 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java @@ -16,65 +16,121 @@ */ package org.apache.kafka.connect.mirror; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; -import java.util.Map; -import java.util.HashMap; import java.util.Collections; -import java.time.Duration; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.concurrent.ConcurrentHashMap; /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */ class OffsetSyncStore implements AutoCloseable { - private final KafkaConsumer<byte[], byte[]> consumer; - private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>(); - private final TopicPartition offsetSyncTopicPartition; + private final KafkaBasedLog<byte[], byte[]> backingStore; + private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>(); + private final TopicAdmin admin; + protected volatile boolean readToEnd = false; OffsetSyncStore(MirrorConnectorConfig config) { - consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(), - new ByteArrayDeserializer(), new ByteArrayDeserializer()); - offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0); - consumer.assign(Collections.singleton(offsetSyncTopicPartition)); + Consumer<byte[], byte[]> consumer = null; + TopicAdmin admin = null; + KafkaBasedLog<byte[], byte[]> store; + try { + consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig()); + admin = new TopicAdmin(config.offsetSyncsTopicAdminConfig()); + store = createBackingStore(config, consumer, admin); + } catch (Throwable t) { + Utils.closeQuietly(consumer, "consumer for offset syncs"); + Utils.closeQuietly(admin, "admin client for offset syncs"); + throw t; + } + this.admin = admin; + this.backingStore = store; } - // for testing - OffsetSyncStore(KafkaConsumer<byte[], byte[]> consumer, TopicPartition offsetSyncTopicPartition) { - this.consumer = consumer; - this.offsetSyncTopicPartition = offsetSyncTopicPartition; + private KafkaBasedLog<byte[], byte[]> createBackingStore(MirrorConnectorConfig config, Consumer<byte[], byte[]> consumer, TopicAdmin admin) { + return new KafkaBasedLog<byte[], byte[]>( + config.offsetSyncsTopic(), + Collections.emptyMap(), + Collections.emptyMap(), + () -> admin, + (error, record) -> this.handleRecord(record), + Time.SYSTEM, + ignored -> { + } + ) { + @Override + protected Producer<byte[], byte[]> createProducer() { + return null; + } + + @Override + protected Consumer<byte[], byte[]> createConsumer() { + return consumer; + } + + @Override + protected boolean readPartition(TopicPartition topicPartition) { + return topicPartition.partition() == 0; + } + }; + } + + OffsetSyncStore() { + this.admin = null; + this.backingStore = null; + } + + /** + * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage. + */ + public void start() { + backingStore.start(); + readToEnd = true; } OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) { + if (!readToEnd) { + // If we have not read to the end of the syncs topic at least once, decline to translate any offsets. + // This prevents emitting stale offsets while initially reading the offset syncs topic. + return OptionalLong.empty(); + } Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition); if (offsetSync.isPresent()) { if (offsetSync.get().upstreamOffset() > upstreamOffset) { // Offset is too far in the past to translate accurately return OptionalLong.of(-1L); } - long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset(); + // If the consumer group is ahead of the offset sync, we can translate the upstream offset only 1 + // downstream offset past the offset sync itself. This is because we know that future records must appear + // ahead of the offset sync, but we cannot estimate how many offsets from the upstream topic + // will be written vs dropped. If we overestimate, then we may skip the correct offset and have data loss. + // This also handles consumer groups at the end of a topic whose offsets point past the last valid record. + // This may cause re-reading of records depending on the age of the offset sync. + // s=offset sync pair, ?=record may or may not be replicated, g=consumer group offset, r=re-read record + // source |-s?????r???g-| + // | ______/ + // | / + // vv + // target |-sg----r-----| + long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1; return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep); } else { return OptionalLong.empty(); } } - // poll and handle records - synchronized void update(Duration pollTimeout) { - try { - consumer.poll(pollTimeout).forEach(this::handleRecord); - } catch (WakeupException e) { - // swallow - } - } - - public synchronized void close() { - consumer.wakeup(); - Utils.closeQuietly(consumer, "offset sync store consumer"); + @Override + public void close() { + Utils.closeQuietly(backingStore != null ? backingStore::stop : null, "backing store for offset syncs"); + Utils.closeQuietly(admin, "admin client for offset syncs"); } protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java index 20735cd2334..500cb6c131a 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java @@ -52,11 +52,16 @@ public class MirrorCheckpointTaskTest { @Test public void testCheckpoint() { + long t1UpstreamOffset = 3L; + long t1DownstreamOffset = 4L; + long t2UpstreamOffset = 7L; + long t2DownstreamOffset = 8L; OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore(); + offsetSyncStore.start(); MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap()); - offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L); - offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L); + offsetSyncStore.sync(new TopicPartition("topic1", 2), t1UpstreamOffset, t1DownstreamOffset); + offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), t2UpstreamOffset, t2DownstreamOffset); Optional<Checkpoint> optionalCheckpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2), new OffsetAndMetadata(10, null)); assertTrue(optionalCheckpoint1.isPresent()); @@ -70,7 +75,7 @@ public class MirrorCheckpointTaskTest { "checkpoint group9 sourcePartition failed"); assertEquals(10, checkpoint1.upstreamOffset(), "checkpoint group9 upstreamOffset failed"); - assertEquals(11, checkpoint1.downstreamOffset(), + assertEquals(t1DownstreamOffset + 1, checkpoint1.downstreamOffset(), "checkpoint group9 downstreamOffset failed"); assertEquals(123L, sourceRecord1.timestamp().longValue(), "checkpoint group9 timestamp failed"); @@ -87,10 +92,27 @@ public class MirrorCheckpointTaskTest { "checkpoint group11 sourcePartition failed"); assertEquals(12, checkpoint2.upstreamOffset(), "checkpoint group11 upstreamOffset failed"); - assertEquals(13, checkpoint2.downstreamOffset(), + assertEquals(t2DownstreamOffset + 1, checkpoint2.downstreamOffset(), "checkpoint group11 downstreamOffset failed"); assertEquals(234L, sourceRecord2.timestamp().longValue(), "checkpoint group11 timestamp failed"); + Optional<Checkpoint> optionalCheckpoint3 = mirrorCheckpointTask.checkpoint("group13", new TopicPartition("target2.topic5", 6), + new OffsetAndMetadata(7, null)); + assertTrue(optionalCheckpoint3.isPresent()); + Checkpoint checkpoint3 = optionalCheckpoint3.get(); + SourceRecord sourceRecord3 = mirrorCheckpointTask.checkpointRecord(checkpoint3, 234L); + assertEquals(new TopicPartition("topic5", 6), checkpoint3.topicPartition(), + "checkpoint group13 topic5 failed"); + assertEquals("group13", checkpoint3.consumerGroupId(), + "checkpoint group13 consumerGroupId failed"); + assertEquals("group13", Checkpoint.unwrapGroup(sourceRecord3.sourcePartition()), + "checkpoint group13 sourcePartition failed"); + assertEquals(t2UpstreamOffset, checkpoint3.upstreamOffset(), + "checkpoint group13 upstreamOffset failed"); + assertEquals(t2DownstreamOffset, checkpoint3.downstreamOffset(), + "checkpoint group13 downstreamOffset failed"); + assertEquals(234L, sourceRecord3.timestamp().longValue(), + "checkpoint group13 timestamp failed"); } @Test @@ -150,6 +172,7 @@ public class MirrorCheckpointTaskTest { @Test public void testNoCheckpointForTopicWithoutOffsetSyncs() { OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore(); + offsetSyncStore.start(); MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap()); offsetSyncStore.sync(new TopicPartition("topic1", 0), 3L, 4L); @@ -165,6 +188,7 @@ public class MirrorCheckpointTaskTest { @Test public void testNoCheckpointForTopicWithNullOffsetAndMetadata() { OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore(); + offsetSyncStore.start(); MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap()); offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L); diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java index 9224a088081..163e5b72250 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; +import java.util.OptionalLong; + import static org.junit.jupiter.api.Assertions.assertEquals; public class OffsetSyncStoreTest { @@ -30,7 +32,13 @@ public class OffsetSyncStoreTest { static class FakeOffsetSyncStore extends OffsetSyncStore { FakeOffsetSyncStore() { - super(null, null); + super(); + } + + @Override + public void start() { + // do not call super to avoid NPE without a KafkaBasedLog. + readToEnd = true; } void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) { @@ -44,29 +52,57 @@ public class OffsetSyncStoreTest { @Test public void testOffsetTranslation() { - FakeOffsetSyncStore store = new FakeOffsetSyncStore(); - - store.sync(tp, 100, 200); - assertEquals(250L, store.translateDownstream(tp, 150).getAsLong(), - "Failure in translating downstream offset 250"); - - // Translate exact offsets - store.sync(tp, 150, 251); - assertEquals(251L, store.translateDownstream(tp, 150).getAsLong(), - "Failure in translating exact downstream offset 251"); - - // Use old offset (5) prior to any sync -> can't translate - assertEquals(-1, store.translateDownstream(tp, 5).getAsLong(), - "Expected old offset to not translate"); - - // Downstream offsets reset - store.sync(tp, 200, 10); - assertEquals(10L, store.translateDownstream(tp, 200).getAsLong(), - "Failure in resetting translation of downstream offset"); - - // Upstream offsets reset - store.sync(tp, 20, 20); - assertEquals(20L, store.translateDownstream(tp, 20).getAsLong(), - "Failure in resetting translation of upstream offset"); + try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { + store.start(); + + // Emit synced downstream offset without dead-reckoning + store.sync(tp, 100, 200); + assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150)); + + // Translate exact offsets + store.sync(tp, 150, 251); + assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150)); + + // Use old offset (5) prior to any sync -> can't translate + assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5)); + + // Downstream offsets reset + store.sync(tp, 200, 10); + assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200)); + + // Upstream offsets reset + store.sync(tp, 20, 20); + assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20)); + } + } + + @Test + public void testNoTranslationIfStoreNotStarted() { + try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { + // no offsets exist and store is not started + assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); + assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); + + // read a sync during startup + store.sync(tp, 100, 200); + assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); + assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); + + // After the store is started all offsets are visible + store.start(); + assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0)); + assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100)); + assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200)); + } + } + + @Test + public void testNoTranslationIfNoOffsetSync() { + try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { + store.start(); + assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); + } } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java index 56ae3f8ebf9..e8f01e2bc12 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.connect.mirror.MirrorClient; import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; import org.apache.kafka.connect.mirror.MirrorMakerConfig; -import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -107,11 +106,8 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0, "Checkpoints were not emitted downstream to backup cluster."); - Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS, - Duration.ofMillis(CHECKPOINT_DURATION_MS)); - - assertTrue(backupOffsets.containsKey( - new TopicPartition("test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets); + Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions( + backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "test-topic-1"); // Failover consumer group to backup cluster. try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) { @@ -221,18 +217,19 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat waitForTopicCreated(primary, "backup.test-topic-1"); waitForTopicCreated(backup, "test-topic-1"); // create a consumer at backup cluster with same consumer group Id to consume 1 topic - Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo( - consumerProps, "test-topic-1"); + try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo( + consumerProps, "test-topic-1")) { - waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("test-topic-1"), + waitForConsumerGroupFullSync(backup, Collections.singletonList("test-topic-1"), consumerGroupName, NUM_RECORDS_PRODUCED); - ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); - // the size of consumer record should be zero, because the offsets of the same consumer group - // have been automatically synchronized from primary to backup by the background job, so no - // more records to consume from the replicated topic by the same consumer group at backup cluster - assertEquals(0, records.count(), "consumer record size is not zero"); + // the size of consumer record should be zero, because the offsets of the same consumer group + // have been automatically synchronized from primary to backup by the background job, so no + // more records to consume from the replicated topic by the same consumer group at backup cluster + assertEquals(0, records.count(), "consumer record size is not zero"); + } // now create a new topic in primary cluster primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS); @@ -244,22 +241,24 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat // create a consumer at primary cluster to consume the new topic try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( - "group.id", "consumer-group-1"), "test-topic-2")) { + "group.id", consumerGroupName), "test-topic-2")) { // we need to wait for consuming all the records for MM2 replicating the expected offsets waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED); } // create a consumer at backup cluster with same consumer group Id to consume old and new topic - backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( - "group.id", consumerGroupName), "test-topic-1", "test-topic-2"); + try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( + "group.id", consumerGroupName), "test-topic-1", "test-topic-2")) { - waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("test-topic-1", "test-topic-2"), - consumerGroupName, NUM_RECORDS_PRODUCED); + waitForConsumerGroupFullSync(backup, Arrays.asList("test-topic-1", "test-topic-2"), + consumerGroupName, NUM_RECORDS_PRODUCED); + + ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + // similar reasoning as above, no more records to consume by the same consumer group at backup cluster + assertEquals(0, records.count(), "consumer record size is not zero"); + } - records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); - // similar reasoning as above, no more records to consume by the same consumer group at backup cluster - assertEquals(0, records.count(), "consumer record size is not zero"); - backupConsumer.close(); + assertMonotonicCheckpoints(backup, "primary.checkpoints.internal"); } /* diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 3d448e9c150..4c8340236e4 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -20,6 +20,8 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -37,7 +39,6 @@ import org.apache.kafka.connect.mirror.MirrorSourceConnector; import org.apache.kafka.connect.mirror.SourceAndTarget; import org.apache.kafka.connect.mirror.Checkpoint; import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; -import org.apache.kafka.connect.mirror.ReplicationPolicy; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; @@ -54,6 +55,8 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; import org.junit.jupiter.api.Tag; @@ -93,6 +96,7 @@ public class MirrorConnectorsIntegrationBaseTest { private static final int OFFSET_SYNC_DURATION_MS = 30_000; private static final int TOPIC_SYNC_DURATION_MS = 60_000; private static final int REQUEST_TIMEOUT_DURATION_MS = 60_000; + private static final int CHECKPOINT_INTERVAL_DURATION_MS = 1_000; private static final int NUM_WORKERS = 3; protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500L); protected static final String PRIMARY_CLUSTER_ALIAS = "primary"; @@ -115,7 +119,7 @@ public class MirrorConnectorsIntegrationBaseTest { protected Properties backupBrokerProps = new Properties(); protected Map<String, String> primaryWorkerProps = new HashMap<>(); protected Map<String, String> backupWorkerProps = new HashMap<>(); - + @BeforeEach public void startClusters() throws Exception { startClusters(new HashMap<String, String>() {{ @@ -296,13 +300,8 @@ public class MirrorConnectorsIntegrationBaseTest { assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0, "Checkpoints were not emitted downstream to backup cluster."); - Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS, - Duration.ofMillis(CHECKPOINT_DURATION_MS)); - - for (int i = 0; i < NUM_PARTITIONS; i++) { - assertTrue(backupOffsets.containsKey(new TopicPartition("primary.test-topic-1", i)), - "Offsets not translated downstream to backup cluster. Found: " + backupOffsets); - } + Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions( + backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "primary.test-topic-1"); // Failover consumer group to backup cluster. try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) { @@ -318,11 +317,10 @@ public class MirrorConnectorsIntegrationBaseTest { "Checkpoints were not emitted upstream to primary cluster."); } - waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS, - Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated downstream to primary cluster."); + Map<TopicPartition, OffsetAndMetadata> primaryOffsets = waitForCheckpointOnAllPartitions( + primaryClient, consumerGroupName, BACKUP_CLUSTER_ALIAS, "backup.test-topic-1"); - Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS, - Duration.ofMillis(CHECKPOINT_DURATION_MS)); + assertMonotonicCheckpoints(backup, "primary.checkpoints.internal"); primaryClient.close(); backupClient.close(); @@ -409,7 +407,7 @@ public class MirrorConnectorsIntegrationBaseTest { assertEquals(0, offset.offset(), "Offset of last partition is not zero"); } } - + @Test public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException { produceMessages(primary, "test-topic-1"); @@ -440,18 +438,19 @@ public class MirrorConnectorsIntegrationBaseTest { waitForTopicCreated(primary, "backup.test-topic-1"); waitForTopicCreated(backup, "primary.test-topic-1"); // create a consumer at backup cluster with same consumer group Id to consume 1 topic - Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo( - consumerProps, "primary.test-topic-1"); + try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo( + consumerProps, "primary.test-topic-1")) { - waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("primary.test-topic-1"), - consumerGroupName, NUM_RECORDS_PRODUCED); + waitForConsumerGroupFullSync(backup, Collections.singletonList("primary.test-topic-1"), + consumerGroupName, NUM_RECORDS_PRODUCED); - ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); - // the size of consumer record should be zero, because the offsets of the same consumer group - // have been automatically synchronized from primary to backup by the background job, so no - // more records to consume from the replicated topic by the same consumer group at backup cluster - assertEquals(0, records.count(), "consumer record size is not zero"); + // the size of consumer record should be zero, because the offsets of the same consumer group + // have been automatically synchronized from primary to backup by the background job, so no + // more records to consume from the replicated topic by the same consumer group at backup cluster + assertEquals(0, records.count(), "consumer record size is not zero"); + } // now create a new topic in primary cluster primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS); @@ -463,22 +462,24 @@ public class MirrorConnectorsIntegrationBaseTest { // create a consumer at primary cluster to consume the new topic try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( - "group.id", "consumer-group-1"), "test-topic-2")) { + "group.id", consumerGroupName), "test-topic-2")) { // we need to wait for consuming all the records for MM2 replicating the expected offsets waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED); } // create a consumer at backup cluster with same consumer group Id to consume old and new topic - backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( - "group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2"); + try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( + "group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2")) { + + waitForConsumerGroupFullSync(backup, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), + consumerGroupName, NUM_RECORDS_PRODUCED); - waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), - consumerGroupName, NUM_RECORDS_PRODUCED); + ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + // similar reasoning as above, no more records to consume by the same consumer group at backup cluster + assertEquals(0, records.count(), "consumer record size is not zero"); + } - records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); - // similar reasoning as above, no more records to consume by the same consumer group at backup cluster - assertEquals(0, records.count(), "consumer record size is not zero"); - backupConsumer.close(); + assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal"); } @Test @@ -495,10 +496,14 @@ public class MirrorConnectorsIntegrationBaseTest { // Ensure the offset syncs topic is created in the target cluster waitForTopicCreated(backup.kafka(), "mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal"); + String consumerGroupName = "consumer-group-syncs-on-target"; + Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName); + produceMessages(primary, "test-topic-1"); - ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy(); - String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1"); + warmUpConsumer(consumerProps); + + String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS); // Check offsets are pushed to the checkpoint topic Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( @@ -516,6 +521,8 @@ public class MirrorConnectorsIntegrationBaseTest { "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1" ); + assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal"); + // Ensure no offset-syncs topics have been created on the primary cluster Set<String> primaryTopics = primary.kafka().createAdminClient().listTopics().names().get(); assertFalse(primaryTopics.contains("mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal")); @@ -568,6 +575,17 @@ public class MirrorConnectorsIntegrationBaseTest { // Send some records to test-topic-no-checkpoints in the source cluster produceMessages(primary, "test-topic-no-checkpoints"); + try (Consumer<byte[], byte[]> consumer = primary.kafka().createConsumer(consumerProps)) { + Collection<TopicPartition> tps = Arrays.asList(tp1, tp2); + Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps); + Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = endOffsets.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> new OffsetAndMetadata(e.getValue()) + )); + consumer.commitSync(offsetsToCommit); + } + waitForCondition(() -> { Map<TopicPartition, OffsetAndMetadata> translatedOffsets = backupClient.remoteConsumerOffsets( consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L)); @@ -576,6 +594,33 @@ public class MirrorConnectorsIntegrationBaseTest { }, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to backup cluster"); } + @Test + public void testRestartReplication() throws InterruptedException { + String consumerGroupName = "consumer-group-restart"; + Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName); + String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS); + warmUpConsumer(consumerProps); + mm2Props.put("sync.group.offsets.enabled", "true"); + mm2Props.put("sync.group.offsets.interval.seconds", "1"); + mm2Config = new MirrorMakerConfig(mm2Props); + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + produceMessages(primary, "test-topic-1"); + try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) { + waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED); + } + waitForConsumerGroupFullSync(backup, Collections.singletonList(remoteTopic), consumerGroupName, NUM_RECORDS_PRODUCED); + restartMirrorMakerConnectors(backup, CONNECTOR_LIST); + assertMonotonicCheckpoints(backup, "primary.checkpoints.internal"); + Thread.sleep(5000); + produceMessages(primary, "test-topic-1"); + try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) { + waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED); + } + waitForConsumerGroupFullSync(backup, Collections.singletonList(remoteTopic), consumerGroupName, 2 * NUM_RECORDS_PRODUCED); + assertMonotonicCheckpoints(backup, "primary.checkpoints.internal"); + } + + private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) { return new TopicPartition(remoteTopicName(tp.topic(), alias), tp.partition()); } @@ -638,6 +683,12 @@ public class MirrorConnectorsIntegrationBaseTest { } } + protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses) { + for (Class<? extends Connector> connector : connectorClasses) { + connectCluster.restartConnectorAndTasks(connector.getSimpleName(), false, true, false); + } + } + /* * wait for the topic created on the cluster */ @@ -680,7 +731,7 @@ public class MirrorConnectorsIntegrationBaseTest { protected void produceMessages(EmbeddedConnectCluster cluster, String topicName) { Map<String, String> recordSent = generateRecords(NUM_RECORDS_PRODUCED); for (Map.Entry<String, String> entry : recordSent.entrySet()) { - cluster.kafka().produce(topicName, entry.getKey(), entry.getValue()); + produce(cluster.kafka(), topicName, null, entry.getKey(), entry.getValue()); } } @@ -691,16 +742,70 @@ public class MirrorConnectorsIntegrationBaseTest { int cnt = 0; for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++) for (int p = 0; p < numPartitions; p++) - cluster.kafka().produce(topicName, p, "key", "value-" + cnt++); + produce(cluster.kafka(), topicName, p, "key", "value-" + cnt++); } - + + /** + * Produce a test record to a Kafka cluster. + * This method allows subclasses to configure and use their own Kafka Producer instead of using the built-in default. + * @param cluster Kafka cluster that should receive the record + * @param topic Topic to send the record to, non-null + * @param partition Partition to send the record to, maybe null. + * @param key Kafka key for the record + * @param value Kafka value for the record + */ + protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) { + cluster.produce(topic, partition, key, value); + } + + protected static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions( + MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName + ) throws InterruptedException { + AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>(); + waitForCondition( + () -> { + Map<TopicPartition, OffsetAndMetadata> offsets = client.remoteConsumerOffsets( + consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000)); + for (int i = 0; i < NUM_PARTITIONS; i++) { + if (!offsets.containsKey(new TopicPartition(topicName, i))) { + log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i); + return false; + } + } + ret.set(offsets); + return true; + }, + CHECKPOINT_DURATION_MS, + String.format( + "Offsets for consumer group %s not translated from %s for topic %s", + consumerGroupName, + remoteClusterAlias, + topicName + ) + ); + return ret.get(); + } + /* * given consumer group, topics and expected number of records, make sure the consumer group * offsets are eventually synced to the expected offset numbers */ - protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect, - Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords) - throws InterruptedException { + protected static <T> void waitForConsumerGroupFullSync( + EmbeddedConnectCluster connect, List<String> topics, String consumerGroupId, int numRecords + ) throws InterruptedException { + int expectedRecords = numRecords * topics.size(); + Map<String, Object> consumerProps = new HashMap<>(); + consumerProps.put("isolation.level", "read_committed"); + consumerProps.put("auto.offset.reset", "earliest"); + Map<TopicPartition, Long> lastOffset = new HashMap<>(); + try (Consumer<byte[], byte[]> consumer = connect.kafka().createConsumerAndSubscribeTo(consumerProps, topics.toArray(new String[0]))) { + final AtomicInteger totalConsumedRecords = new AtomicInteger(0); + waitForCondition(() -> { + ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_TIMEOUT_MS); + records.forEach(record -> lastOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset())); + return expectedRecords == totalConsumedRecords.addAndGet(records.count()); + }, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all records in time"); + } try (Admin adminClient = connect.kafka().createAdminClient()) { List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size()); for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; partitionIndex++) { @@ -708,23 +813,54 @@ public class MirrorConnectorsIntegrationBaseTest { tps.add(new TopicPartition(topic, partitionIndex)); } } - long expectedTotalOffsets = numRecords * topics.size(); waitForCondition(() -> { Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = - adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get(); - long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream() - .mapToLong(OffsetAndMetadata::offset).sum(); - - Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS); - long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum(); - - // make sure the consumer group offsets are synced to expected number - return totalOffsets == expectedTotalOffsets && consumerGroupOffsetTotal > 0; + adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get(); + Map<TopicPartition, OffsetSpec> endOffsetRequest = tps.stream() + .collect(Collectors.toMap(Function.identity(), ignored -> OffsetSpec.latest())); + Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = + adminClient.listOffsets(endOffsetRequest).all().get(); + + for (TopicPartition tp : tps) { + assertTrue(consumerGroupOffsets.containsKey(tp), + "TopicPartition " + tp + " does not have translated offsets"); + assertTrue(consumerGroupOffsets.get(tp).offset() > lastOffset.get(tp), + "TopicPartition " + tp + " does not have fully-translated offsets"); + assertTrue(consumerGroupOffsets.get(tp).offset() <= endOffsets.get(tp).offset(), + "TopicPartition " + tp + " has downstream offsets beyond the log end, this would lead to negative lag metrics"); + } + return true; }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); } } + protected static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, String checkpointTopic) { + TopicPartition checkpointTopicPartition = new TopicPartition(checkpointTopic, 0); + try (Consumer<byte[], byte[]> backupConsumer = cluster.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( + "auto.offset.reset", "earliest"), checkpointTopic)) { + Map<String, Map<TopicPartition, Checkpoint>> checkpointsByGroup = new HashMap<>(); + long deadline = System.currentTimeMillis() + CHECKPOINT_DURATION_MS; + do { + ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L)); + for (ConsumerRecord<byte[], byte[]> record : records) { + Checkpoint checkpoint = Checkpoint.deserializeRecord(record); + Map<TopicPartition, Checkpoint> lastCheckpoints = checkpointsByGroup.computeIfAbsent( + checkpoint.consumerGroupId(), + ignored -> new HashMap<>()); + Checkpoint lastCheckpoint = lastCheckpoints.getOrDefault(checkpoint.topicPartition(), checkpoint); + assertTrue(checkpoint.downstreamOffset() >= lastCheckpoint.downstreamOffset(), + "Checkpoint was non-monotonic for " + + checkpoint.consumerGroupId() + + ": " + + checkpoint.topicPartition()); + lastCheckpoints.put(checkpoint.topicPartition(), checkpoint); + } + } while (backupConsumer.currentLag(checkpointTopicPartition).orElse(1) > 0 && System.currentTimeMillis() < deadline); + assertEquals(0, backupConsumer.currentLag(checkpointTopicPartition).orElse(1), "Unable to read all checkpoints within " + CHECKPOINT_DURATION_MS + "ms"); + } + } + /* * make sure the consumer to consume expected number of records */ @@ -747,7 +883,7 @@ public class MirrorConnectorsIntegrationBaseTest { mm2Props.put("max.tasks", "10"); mm2Props.put("groups", "consumer-group-.*"); mm2Props.put("sync.topic.acls.enabled", "false"); - mm2Props.put("emit.checkpoints.interval.seconds", "1"); + mm2Props.put("emit.checkpoints.interval.seconds", String.valueOf(CHECKPOINT_INTERVAL_DURATION_MS / 1000)); mm2Props.put("emit.heartbeats.interval.seconds", "1"); mm2Props.put("refresh.topics.interval.seconds", "1"); mm2Props.put("refresh.groups.interval.seconds", "1"); @@ -758,7 +894,10 @@ public class MirrorConnectorsIntegrationBaseTest { mm2Props.put("offset.storage.replication.factor", "1"); mm2Props.put("status.storage.replication.factor", "1"); mm2Props.put("replication.factor", "1"); - + // Sync offsets as soon as possible to ensure the final record in a finite test has its offset translated. + mm2Props.put("offset.lag.max", "0"); + mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".offset.flush.interval.ms", "5000"); + mm2Props.put(BACKUP_CLUSTER_ALIAS + ".offset.flush.interval.ms", "5000"); return mm2Props; } @@ -785,14 +924,14 @@ public class MirrorConnectorsIntegrationBaseTest { * Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly */ protected void warmUpConsumer(Map<String, Object> consumerProps) { - Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1"); - dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); - dummyConsumer.commitSync(); - dummyConsumer.close(); - dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1"); - dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); - dummyConsumer.commitSync(); - dummyConsumer.close(); + try (Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) { + dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + dummyConsumer.commitSync(); + } + try (Consumer<byte[], byte[]> dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) { + dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + dummyConsumer.commitSync(); + } } /* diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java new file mode 100644 index 00000000000..6ac09ef32bb --- /dev/null +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.junit.jupiter.api.BeforeEach; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + + +/** + * Integration test for MirrorMaker2 in which source records are emitted with a transactional producer, + * which interleaves transaction commit messages into the source topic which are not propagated downstream. + */ +public class MirrorConnectorsIntegrationTransactionsTest extends MirrorConnectorsIntegrationBaseTest { + + private Map<String, Object> producerProps = new HashMap<>(); + + @BeforeEach + @Override + public void startClusters() throws Exception { + primaryBrokerProps.put("transaction.state.log.replication.factor", "1"); + backupBrokerProps.put("transaction.state.log.replication.factor", "1"); + primaryBrokerProps.put("transaction.state.log.min.isr", "1"); + backupBrokerProps.put("transaction.state.log.min.isr", "1"); + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); + producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "embedded-kafka-0"); + super.startClusters(); + } + + /** + * Produce records with a short-lived transactional producer to interleave transaction markers in the topic. + */ + @Override + protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) { + ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic, partition, key == null ? null : key.getBytes(), value == null ? null : value.getBytes()); + try (Producer<byte[], byte[]> producer = cluster.createProducer(producerProps)) { + producer.initTransactions(); + producer.beginTransaction(); + producer.send(msg).get(120000, TimeUnit.MILLISECONDS); + producer.commitTransaction(); + } catch (Exception e) { + throw new KafkaException("Could not produce message: " + msg, e); + } + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 899b42dd877..431ae871ce9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -257,8 +257,16 @@ public class KafkaBasedLog<K, V> { " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" + " this is your first use of the topic it may have taken too long to create."); - for (PartitionInfo partition : partitionInfos) - partitions.add(new TopicPartition(partition.topic(), partition.partition())); + for (PartitionInfo partition : partitionInfos) { + TopicPartition topicPartition = new TopicPartition(partition.topic(), partition.partition()); + if (readPartition(topicPartition)) { + partitions.add(topicPartition); + } + } + if (partitions.isEmpty()) { + throw new ConnectException("Some partitions for " + topic + " exist, but no partitions matched the " + + "required filter."); + } partitionCount = partitions.size(); consumer.assign(partitions); @@ -392,6 +400,18 @@ public class KafkaBasedLog<K, V> { return new KafkaConsumer<>(consumerConfigs); } + /** + * Signals whether a topic partition should be read by this log. Invoked on {@link #start() startup} once + * for every partition found in the log's backing topic. + * <p>This method can be overridden by subclasses when only a subset of the assigned partitions + * should be read into memory. By default, all partitions are read. + * @param topicPartition A topic partition which could be read by this log. + * @return true if the partition should be read by this log, false if its contents should be ignored. + */ + protected boolean readPartition(TopicPartition topicPartition) { + return true; + } + private void poll(long timeoutMs) { try { ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(timeoutMs)); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index f9defc77ca2..691852430a0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -282,8 +282,7 @@ public class TopicAdmin implements AutoCloseable { this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), Admin.create(adminConfig)); } - // visible for testing - TopicAdmin(Object bootstrapServers, Admin adminClient) { + public TopicAdmin(Object bootstrapServers, Admin adminClient) { this(bootstrapServers, adminClient, true); }
