This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a54a34a11c1 KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing
negative downstream lag, syncing stale offsets, and flaky integration tests
(#13178)
a54a34a11c1 is described below
commit a54a34a11c1c867ff62a7234334cad5139547fd7
Author: Greg Harris <[email protected]>
AuthorDate: Fri Feb 17 14:25:17 2023 -0800
KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream
lag, syncing stale offsets, and flaky integration tests (#13178)
KAFKA-12468: Fix negative lag on down consumer groups synced by MirrorMaker
2
KAFKA-13659: Stop syncing consumer groups with stale offsets in MirrorMaker
2
KAFKA-12566: Fix flaky MirrorMaker 2 integration tests
Reviewers: Chris Egerton <[email protected]>
---
.../connect/mirror/MirrorCheckpointConfig.java | 6 +
.../kafka/connect/mirror/MirrorCheckpointTask.java | 17 +-
.../kafka/connect/mirror/OffsetSyncStore.java | 119 +++++++---
.../connect/mirror/MirrorCheckpointTaskTest.java | 32 ++-
.../kafka/connect/mirror/OffsetSyncStoreTest.java | 86 +++++--
.../IdentityReplicationIntegrationTest.java | 47 ++--
.../MirrorConnectorsIntegrationBaseTest.java | 263 ++++++++++++++++-----
...MirrorConnectorsIntegrationExactlyOnceTest.java | 3 -
...irrorConnectorsIntegrationTransactionsTest.java | 66 ++++++
.../apache/kafka/connect/util/KafkaBasedLog.java | 24 +-
.../org/apache/kafka/connect/util/TopicAdmin.java | 3 +-
11 files changed, 507 insertions(+), 159 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
index e21d22af1d4..122d8ad1e7f 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
@@ -150,6 +150,12 @@ public class MirrorCheckpointConfig extends
MirrorConnectorConfig {
: targetConsumerConfig();
}
+ Map<String, Object> offsetSyncsTopicAdminConfig() {
+ return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
+ ? sourceAdminConfig()
+ : targetAdminConfig();
+ }
+
Duration consumerPollTimeout() {
return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
}
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 95b7b5a2bda..9f5a4b00e69 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -102,10 +102,13 @@ public class MirrorCheckpointTask extends SourceTask {
idleConsumerGroupsOffset = new HashMap<>();
checkpointsPerConsumerGroup = new HashMap<>();
scheduler = new Scheduler(MirrorCheckpointTask.class,
config.adminTimeout());
- scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset,
config.syncGroupOffsetsInterval(),
- "refreshing idle consumers group offsets
at target cluster");
- scheduler.scheduleRepeatingDelayed(this::syncGroupOffset,
config.syncGroupOffsetsInterval(),
- "sync idle consumer group offset
from source to target");
+ scheduler.execute(() -> {
+ offsetSyncStore.start();
+ scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset,
config.syncGroupOffsetsInterval(),
+ "refreshing idle consumers group offsets at target
cluster");
+ scheduler.scheduleRepeatingDelayed(this::syncGroupOffset,
config.syncGroupOffsetsInterval(),
+ "sync idle consumer group offset from source to target");
+ }, "starting offset sync store");
}
@Override
@@ -136,7 +139,11 @@ public class MirrorCheckpointTask extends SourceTask {
try {
long deadline = System.currentTimeMillis() + interval.toMillis();
while (!stopping && System.currentTimeMillis() < deadline) {
- offsetSyncStore.update(pollTimeout);
+ Thread.sleep(pollTimeout.toMillis());
+ }
+ if (stopping) {
+ // we are stopping, return early.
+ return null;
}
List<SourceRecord> records = new ArrayList<>();
for (String group : consumerGroups) {
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
index 1cfdb1d265c..0169446aa04 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -16,65 +16,124 @@
*/
package org.apache.kafka.connect.mirror;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
-import java.util.Map;
-import java.util.HashMap;
import java.util.Collections;
-import java.time.Duration;
+import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
/** Used internally by MirrorMaker. Stores offset syncs and performs offset
translation. */
class OffsetSyncStore implements AutoCloseable {
- private final KafkaConsumer<byte[], byte[]> consumer;
- private final Map<TopicPartition, OffsetSync> offsetSyncs = new
HashMap<>();
- private final TopicPartition offsetSyncTopicPartition;
+ private final KafkaBasedLog<byte[], byte[]> backingStore;
+ private final Map<TopicPartition, OffsetSync> offsetSyncs = new
ConcurrentHashMap<>();
+ private final TopicAdmin admin;
+ protected volatile boolean readToEnd = false;
OffsetSyncStore(MirrorCheckpointConfig config) {
- consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
- new ByteArrayDeserializer(), new ByteArrayDeserializer());
- offsetSyncTopicPartition = new
TopicPartition(config.offsetSyncsTopic(), 0);
- consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+ Consumer<byte[], byte[]> consumer = null;
+ TopicAdmin admin = null;
+ KafkaBasedLog<byte[], byte[]> store;
+ try {
+ consumer =
MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());
+ admin = new TopicAdmin(
+
config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+
config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+ store = createBackingStore(config, consumer, admin);
+ } catch (Throwable t) {
+ Utils.closeQuietly(consumer, "consumer for offset syncs");
+ Utils.closeQuietly(admin, "admin client for offset syncs");
+ throw t;
+ }
+ this.admin = admin;
+ this.backingStore = store;
}
- // for testing
- OffsetSyncStore(KafkaConsumer<byte[], byte[]> consumer, TopicPartition
offsetSyncTopicPartition) {
- this.consumer = consumer;
- this.offsetSyncTopicPartition = offsetSyncTopicPartition;
+ private KafkaBasedLog<byte[], byte[]>
createBackingStore(MirrorCheckpointConfig config, Consumer<byte[], byte[]>
consumer, TopicAdmin admin) {
+ return new KafkaBasedLog<byte[], byte[]>(
+ config.offsetSyncsTopic(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ () -> admin,
+ (error, record) -> this.handleRecord(record),
+ Time.SYSTEM,
+ ignored -> {
+ }
+ ) {
+ @Override
+ protected Producer<byte[], byte[]> createProducer() {
+ return null;
+ }
+
+ @Override
+ protected Consumer<byte[], byte[]> createConsumer() {
+ return consumer;
+ }
+
+ @Override
+ protected boolean readPartition(TopicPartition topicPartition) {
+ return topicPartition.partition() == 0;
+ }
+ };
+ }
+
+ OffsetSyncStore() {
+ this.admin = null;
+ this.backingStore = null;
+ }
+
+ /**
+ * Start the OffsetSyncStore, blocking until all previous Offset Syncs
have been read from backing storage.
+ */
+ public void start() {
+ backingStore.start();
+ readToEnd = true;
}
OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long
upstreamOffset) {
+ if (!readToEnd) {
+ // If we have not read to the end of the syncs topic at least
once, decline to translate any offsets.
+ // This prevents emitting stale offsets while initially reading
the offset syncs topic.
+ return OptionalLong.empty();
+ }
Optional<OffsetSync> offsetSync =
latestOffsetSync(sourceTopicPartition);
if (offsetSync.isPresent()) {
if (offsetSync.get().upstreamOffset() > upstreamOffset) {
// Offset is too far in the past to translate accurately
return OptionalLong.of(-1L);
}
- long upstreamStep = upstreamOffset -
offsetSync.get().upstreamOffset();
+ // If the consumer group is ahead of the offset sync, we can
translate the upstream offset only 1
+ // downstream offset past the offset sync itself. This is because
we know that future records must appear
+ // ahead of the offset sync, but we cannot estimate how many
offsets from the upstream topic
+ // will be written vs dropped. If we overestimate, then we may
skip the correct offset and have data loss.
+ // This also handles consumer groups at the end of a topic whose
offsets point past the last valid record.
+ // This may cause re-reading of records depending on the age of
the offset sync.
+ // s=offset sync pair, ?=record may or may not be replicated,
g=consumer group offset, r=re-read record
+ // source |-s?????r???g-|
+ // | ______/
+ // | /
+ // vv
+ // target |-sg----r-----|
+ long upstreamStep = upstreamOffset ==
offsetSync.get().upstreamOffset() ? 0 : 1;
return OptionalLong.of(offsetSync.get().downstreamOffset() +
upstreamStep);
} else {
return OptionalLong.empty();
}
}
- // poll and handle records
- synchronized void update(Duration pollTimeout) {
- try {
- consumer.poll(pollTimeout).forEach(this::handleRecord);
- } catch (WakeupException e) {
- // swallow
- }
- }
-
- public synchronized void close() {
- consumer.wakeup();
- Utils.closeQuietly(consumer, "offset sync store consumer");
+ @Override
+ public void close() {
+ Utils.closeQuietly(backingStore != null ? backingStore::stop : null,
"backing store for offset syncs");
+ Utils.closeQuietly(admin, "admin client for offset syncs");
}
protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 20735cd2334..500cb6c131a 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -52,11 +52,16 @@ public class MirrorCheckpointTaskTest {
@Test
public void testCheckpoint() {
+ long t1UpstreamOffset = 3L;
+ long t1DownstreamOffset = 4L;
+ long t2UpstreamOffset = 7L;
+ long t2DownstreamOffset = 8L;
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new
OffsetSyncStoreTest.FakeOffsetSyncStore();
+ offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new
MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore,
Collections.emptyMap(), Collections.emptyMap());
- offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
- offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
+ offsetSyncStore.sync(new TopicPartition("topic1", 2),
t1UpstreamOffset, t1DownstreamOffset);
+ offsetSyncStore.sync(new TopicPartition("target2.topic5", 6),
t2UpstreamOffset, t2DownstreamOffset);
Optional<Checkpoint> optionalCheckpoint1 =
mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
new OffsetAndMetadata(10, null));
assertTrue(optionalCheckpoint1.isPresent());
@@ -70,7 +75,7 @@ public class MirrorCheckpointTaskTest {
"checkpoint group9 sourcePartition failed");
assertEquals(10, checkpoint1.upstreamOffset(),
"checkpoint group9 upstreamOffset failed");
- assertEquals(11, checkpoint1.downstreamOffset(),
+ assertEquals(t1DownstreamOffset + 1, checkpoint1.downstreamOffset(),
"checkpoint group9 downstreamOffset failed");
assertEquals(123L, sourceRecord1.timestamp().longValue(),
"checkpoint group9 timestamp failed");
@@ -87,10 +92,27 @@ public class MirrorCheckpointTaskTest {
"checkpoint group11 sourcePartition failed");
assertEquals(12, checkpoint2.upstreamOffset(),
"checkpoint group11 upstreamOffset failed");
- assertEquals(13, checkpoint2.downstreamOffset(),
+ assertEquals(t2DownstreamOffset + 1, checkpoint2.downstreamOffset(),
"checkpoint group11 downstreamOffset failed");
assertEquals(234L, sourceRecord2.timestamp().longValue(),
"checkpoint group11 timestamp failed");
+ Optional<Checkpoint> optionalCheckpoint3 =
mirrorCheckpointTask.checkpoint("group13", new TopicPartition("target2.topic5",
6),
+ new OffsetAndMetadata(7, null));
+ assertTrue(optionalCheckpoint3.isPresent());
+ Checkpoint checkpoint3 = optionalCheckpoint3.get();
+ SourceRecord sourceRecord3 =
mirrorCheckpointTask.checkpointRecord(checkpoint3, 234L);
+ assertEquals(new TopicPartition("topic5", 6),
checkpoint3.topicPartition(),
+ "checkpoint group13 topic5 failed");
+ assertEquals("group13", checkpoint3.consumerGroupId(),
+ "checkpoint group13 consumerGroupId failed");
+ assertEquals("group13",
Checkpoint.unwrapGroup(sourceRecord3.sourcePartition()),
+ "checkpoint group13 sourcePartition failed");
+ assertEquals(t2UpstreamOffset, checkpoint3.upstreamOffset(),
+ "checkpoint group13 upstreamOffset failed");
+ assertEquals(t2DownstreamOffset, checkpoint3.downstreamOffset(),
+ "checkpoint group13 downstreamOffset failed");
+ assertEquals(234L, sourceRecord3.timestamp().longValue(),
+ "checkpoint group13 timestamp failed");
}
@Test
@@ -150,6 +172,7 @@ public class MirrorCheckpointTaskTest {
@Test
public void testNoCheckpointForTopicWithoutOffsetSyncs() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new
OffsetSyncStoreTest.FakeOffsetSyncStore();
+ offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new
MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore,
Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 0), 3L, 4L);
@@ -165,6 +188,7 @@ public class MirrorCheckpointTaskTest {
@Test
public void testNoCheckpointForTopicWithNullOffsetAndMetadata() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new
OffsetSyncStoreTest.FakeOffsetSyncStore();
+ offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new
MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore,
Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L);
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
index 9224a088081..163e5b72250 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
+import java.util.OptionalLong;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
public class OffsetSyncStoreTest {
@@ -30,7 +32,13 @@ public class OffsetSyncStoreTest {
static class FakeOffsetSyncStore extends OffsetSyncStore {
FakeOffsetSyncStore() {
- super(null, null);
+ super();
+ }
+
+ @Override
+ public void start() {
+ // do not call super to avoid NPE without a KafkaBasedLog.
+ readToEnd = true;
}
void sync(TopicPartition topicPartition, long upstreamOffset, long
downstreamOffset) {
@@ -44,29 +52,57 @@ public class OffsetSyncStoreTest {
@Test
public void testOffsetTranslation() {
- FakeOffsetSyncStore store = new FakeOffsetSyncStore();
-
- store.sync(tp, 100, 200);
- assertEquals(250L, store.translateDownstream(tp, 150).getAsLong(),
- "Failure in translating downstream offset 250");
-
- // Translate exact offsets
- store.sync(tp, 150, 251);
- assertEquals(251L, store.translateDownstream(tp, 150).getAsLong(),
- "Failure in translating exact downstream offset 251");
-
- // Use old offset (5) prior to any sync -> can't translate
- assertEquals(-1, store.translateDownstream(tp, 5).getAsLong(),
- "Expected old offset to not translate");
-
- // Downstream offsets reset
- store.sync(tp, 200, 10);
- assertEquals(10L, store.translateDownstream(tp, 200).getAsLong(),
- "Failure in resetting translation of downstream offset");
-
- // Upstream offsets reset
- store.sync(tp, 20, 20);
- assertEquals(20L, store.translateDownstream(tp, 20).getAsLong(),
- "Failure in resetting translation of upstream offset");
+ try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+ store.start();
+
+ // Emit synced downstream offset without dead-reckoning
+ store.sync(tp, 100, 200);
+ assertEquals(OptionalLong.of(201), store.translateDownstream(tp,
150));
+
+ // Translate exact offsets
+ store.sync(tp, 150, 251);
+ assertEquals(OptionalLong.of(251), store.translateDownstream(tp,
150));
+
+ // Use old offset (5) prior to any sync -> can't translate
+ assertEquals(OptionalLong.of(-1), store.translateDownstream(tp,
5));
+
+ // Downstream offsets reset
+ store.sync(tp, 200, 10);
+ assertEquals(OptionalLong.of(10), store.translateDownstream(tp,
200));
+
+ // Upstream offsets reset
+ store.sync(tp, 20, 20);
+ assertEquals(OptionalLong.of(20), store.translateDownstream(tp,
20));
+ }
+ }
+
+ @Test
+ public void testNoTranslationIfStoreNotStarted() {
+ try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+ // no offsets exist and store is not started
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
0));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
100));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
200));
+
+ // read a sync during startup
+ store.sync(tp, 100, 200);
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
0));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
100));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
200));
+
+ // After the store is started all offsets are visible
+ store.start();
+ assertEquals(OptionalLong.of(-1), store.translateDownstream(tp,
0));
+ assertEquals(OptionalLong.of(200), store.translateDownstream(tp,
100));
+ assertEquals(OptionalLong.of(201), store.translateDownstream(tp,
200));
+ }
+ }
+
+ @Test
+ public void testNoTranslationIfNoOffsetSync() {
+ try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+ store.start();
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp,
0));
+ }
}
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index dd683f1acfe..17aa9ebc142 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
-import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -107,11 +106,8 @@ public class IdentityReplicationIntegrationTest extends
MirrorConnectorsIntegrat
assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS,
"primary.checkpoints.internal").count() > 0,
"Checkpoints were not emitted downstream to backup cluster.");
- Map<TopicPartition, OffsetAndMetadata> backupOffsets =
backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
- Duration.ofMillis(CHECKPOINT_DURATION_MS));
-
- assertTrue(backupOffsets.containsKey(
- new TopicPartition("test-topic-1", 0)), "Offsets not
translated downstream to backup cluster. Found: " + backupOffsets);
+ Map<TopicPartition, OffsetAndMetadata> backupOffsets =
waitForCheckpointOnAllPartitions(
+ backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS,
"test-topic-1");
// Failover consumer group to backup cluster.
try (Consumer<byte[], byte[]> primaryConsumer =
backup.kafka().createConsumer(Collections.singletonMap("group.id",
consumerGroupName))) {
@@ -221,18 +217,19 @@ public class IdentityReplicationIntegrationTest extends
MirrorConnectorsIntegrat
topicShouldNotBeCreated(primary, "backup.test-topic-1");
waitForTopicCreated(backup, "test-topic-1");
// create a consumer at backup cluster with same consumer group Id to
consume 1 topic
- Consumer<byte[], byte[]> backupConsumer =
backup.kafka().createConsumerAndSubscribeTo(
- consumerProps, "test-topic-1");
+ try (Consumer<byte[], byte[]> backupConsumer =
backup.kafka().createConsumerAndSubscribeTo(
+ consumerProps, "test-topic-1")) {
- waitForConsumerGroupOffsetSync(backup, backupConsumer,
Collections.singletonList("test-topic-1"),
- consumerGroupName, NUM_RECORDS_PRODUCED, true);
+ waitForConsumerGroupFullSync(backup,
Collections.singletonList("test-topic-1"),
+ consumerGroupName, NUM_RECORDS_PRODUCED);
- ConsumerRecords<byte[], byte[]> records =
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ ConsumerRecords<byte[], byte[]> records =
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- // the size of consumer record should be zero, because the offsets of
the same consumer group
- // have been automatically synchronized from primary to backup by the
background job, so no
- // more records to consume from the replicated topic by the same
consumer group at backup cluster
- assertEquals(0, records.count(), "consumer record size is not zero");
+ // the size of consumer record should be zero, because the offsets
of the same consumer group
+ // have been automatically synchronized from primary to backup by
the background job, so no
+ // more records to consume from the replicated topic by the same
consumer group at backup cluster
+ assertEquals(0, records.count(), "consumer record size is not
zero");
+ }
// now create a new topic in primary cluster
primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
@@ -244,22 +241,24 @@ public class IdentityReplicationIntegrationTest extends
MirrorConnectorsIntegrat
// create a consumer at primary cluster to consume the new topic
try (Consumer<byte[], byte[]> consumer1 =
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
- "group.id", "consumer-group-1"), "test-topic-2")) {
+ "group.id", consumerGroupName), "test-topic-2")) {
// we need to wait for consuming all the records for MM2
replicating the expected offsets
waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
}
// create a consumer at backup cluster with same consumer group Id to
consume old and new topic
- backupConsumer =
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
- "group.id", consumerGroupName), "test-topic-1",
"test-topic-2");
+ try (Consumer<byte[], byte[]> backupConsumer =
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+ "group.id", consumerGroupName), "test-topic-1",
"test-topic-2")) {
- waitForConsumerGroupOffsetSync(backup, backupConsumer,
Arrays.asList("test-topic-1", "test-topic-2"),
- consumerGroupName, NUM_RECORDS_PRODUCED, true);
+ waitForConsumerGroupFullSync(backup, Arrays.asList("test-topic-1",
"test-topic-2"),
+ consumerGroupName, NUM_RECORDS_PRODUCED);
+
+ ConsumerRecords<byte[], byte[]> records =
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ // similar reasoning as above, no more records to consume by the
same consumer group at backup cluster
+ assertEquals(0, records.count(), "consumer record size is not
zero");
+ }
- records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- // similar reasoning as above, no more records to consume by the same
consumer group at backup cluster
- assertEquals(0, records.count(), "consumer record size is not zero");
- backupConsumer.close();
+ assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
}
/*
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 27da7054b67..82772ceb06e 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -20,6 +20,8 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -37,7 +39,6 @@ import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.mirror.Checkpoint;
import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
-import org.apache.kafka.connect.mirror.ReplicationPolicy;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
@@ -54,6 +55,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Tag;
@@ -93,6 +96,7 @@ public class MirrorConnectorsIntegrationBaseTest {
private static final int OFFSET_SYNC_DURATION_MS = 30_000;
private static final int TOPIC_SYNC_DURATION_MS = 60_000;
private static final int REQUEST_TIMEOUT_DURATION_MS = 60_000;
+ private static final int CHECKPOINT_INTERVAL_DURATION_MS = 1_000;
private static final int NUM_WORKERS = 3;
protected static final Duration CONSUMER_POLL_TIMEOUT_MS =
Duration.ofMillis(500L);
protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
@@ -118,8 +122,7 @@ public class MirrorConnectorsIntegrationBaseTest {
protected Properties backupBrokerProps = new Properties();
protected Map<String, String> primaryWorkerProps = new HashMap<>();
protected Map<String, String> backupWorkerProps = new HashMap<>();
- protected boolean exactOffsetTranslation = true;
-
+
@BeforeEach
public void startClusters() throws Exception {
startClusters(new HashMap<String, String>() {{
@@ -303,13 +306,8 @@ public class MirrorConnectorsIntegrationBaseTest {
assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS,
"primary.checkpoints.internal").count() > 0,
"Checkpoints were not emitted downstream to backup cluster.");
- Map<TopicPartition, OffsetAndMetadata> backupOffsets =
backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
- Duration.ofMillis(CHECKPOINT_DURATION_MS));
-
- for (int i = 0; i < NUM_PARTITIONS; i++) {
- assertTrue(backupOffsets.containsKey(new
TopicPartition("primary.test-topic-1", i)),
- "Offsets not translated downstream to backup cluster.
Found: " + backupOffsets);
- }
+ Map<TopicPartition, OffsetAndMetadata> backupOffsets =
waitForCheckpointOnAllPartitions(
+ backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS,
"primary.test-topic-1");
// Failover consumer group to backup cluster.
try (Consumer<byte[], byte[]> primaryConsumer =
backup.kafka().createConsumer(Collections.singletonMap("group.id",
consumerGroupName))) {
@@ -325,11 +323,10 @@ public class MirrorConnectorsIntegrationBaseTest {
"Checkpoints were not emitted upstream to primary cluster.");
}
- waitForCondition(() ->
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
- Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not
translated downstream to primary cluster.");
+ Map<TopicPartition, OffsetAndMetadata> primaryOffsets =
waitForCheckpointOnAllPartitions(
+ primaryClient, consumerGroupName, BACKUP_CLUSTER_ALIAS,
"backup.test-topic-1");
- Map<TopicPartition, OffsetAndMetadata> primaryOffsets =
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
- Duration.ofMillis(CHECKPOINT_DURATION_MS));
+ assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
primaryClient.close();
backupClient.close();
@@ -416,7 +413,7 @@ public class MirrorConnectorsIntegrationBaseTest {
assertEquals(0, offset.offset(), "Offset of last partition is not
zero");
}
}
-
+
@Test
public void testOneWayReplicationWithAutoOffsetSync() throws
InterruptedException {
produceMessages(primary, "test-topic-1");
@@ -447,18 +444,19 @@ public class MirrorConnectorsIntegrationBaseTest {
topicShouldNotBeCreated(primary, "backup.test-topic-1");
waitForTopicCreated(backup, "primary.test-topic-1");
// create a consumer at backup cluster with same consumer group Id to
consume 1 topic
- Consumer<byte[], byte[]> backupConsumer =
backup.kafka().createConsumerAndSubscribeTo(
- consumerProps, "primary.test-topic-1");
+ try (Consumer<byte[], byte[]> backupConsumer =
backup.kafka().createConsumerAndSubscribeTo(
+ consumerProps, "primary.test-topic-1")) {
- waitForConsumerGroupOffsetSync(backup, backupConsumer,
Collections.singletonList("primary.test-topic-1"),
- consumerGroupName, NUM_RECORDS_PRODUCED, exactOffsetTranslation);
+ waitForConsumerGroupFullSync(backup,
Collections.singletonList("primary.test-topic-1"),
+ consumerGroupName, NUM_RECORDS_PRODUCED);
- ConsumerRecords<byte[], byte[]> records =
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ ConsumerRecords<byte[], byte[]> records =
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- // the size of consumer record should be zero, because the offsets of
the same consumer group
- // have been automatically synchronized from primary to backup by the
background job, so no
- // more records to consume from the replicated topic by the same
consumer group at backup cluster
- assertEquals(0, records.count(), "consumer record size is not zero");
+ // the size of consumer record should be zero, because the offsets
of the same consumer group
+ // have been automatically synchronized from primary to backup by
the background job, so no
+ // more records to consume from the replicated topic by the same
consumer group at backup cluster
+ assertEquals(0, records.count(), "consumer record size is not
zero");
+ }
// now create a new topic in primary cluster
primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
@@ -470,22 +468,24 @@ public class MirrorConnectorsIntegrationBaseTest {
// create a consumer at primary cluster to consume the new topic
try (Consumer<byte[], byte[]> consumer1 =
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
- "group.id", "consumer-group-1"), "test-topic-2")) {
+ "group.id", consumerGroupName), "test-topic-2")) {
// we need to wait for consuming all the records for MM2
replicating the expected offsets
waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
}
// create a consumer at backup cluster with same consumer group Id to
consume old and new topic
- backupConsumer =
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
- "group.id", consumerGroupName), "primary.test-topic-1",
"primary.test-topic-2");
+ try (Consumer<byte[], byte[]> backupConsumer =
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+ "group.id", consumerGroupName), "primary.test-topic-1",
"primary.test-topic-2")) {
+
+ waitForConsumerGroupFullSync(backup,
Arrays.asList("primary.test-topic-1", "primary.test-topic-2"),
+ consumerGroupName, NUM_RECORDS_PRODUCED);
- waitForConsumerGroupOffsetSync(backup, backupConsumer,
Arrays.asList("primary.test-topic-1", "primary.test-topic-2"),
- consumerGroupName, NUM_RECORDS_PRODUCED, exactOffsetTranslation);
+ ConsumerRecords<byte[], byte[]> records =
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ // similar reasoning as above, no more records to consume by the
same consumer group at backup cluster
+ assertEquals(0, records.count(), "consumer record size is not
zero");
+ }
- records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- // similar reasoning as above, no more records to consume by the same
consumer group at backup cluster
- assertEquals(0, records.count(), "consumer record size is not zero");
- backupConsumer.close();
+ assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS +
".checkpoints.internal");
}
@Test
@@ -502,10 +502,14 @@ public class MirrorConnectorsIntegrationBaseTest {
// Ensure the offset syncs topic is created in the target cluster
waitForTopicCreated(backup, "mm2-offset-syncs." +
PRIMARY_CLUSTER_ALIAS + ".internal");
+ String consumerGroupName = "consumer-group-syncs-on-target";
+ Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
+
produceMessages(primary, "test-topic-1");
- ReplicationPolicy replicationPolicy = new
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
- String remoteTopic =
replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
+ warmUpConsumer(consumerProps);
+
+ String remoteTopic = remoteTopicName("test-topic-1",
PRIMARY_CLUSTER_ALIAS);
// Check offsets are pushed to the checkpoint topic
Consumer<byte[], byte[]> backupConsumer =
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
@@ -523,6 +527,8 @@ public class MirrorConnectorsIntegrationBaseTest {
"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS +
".test-topic-1"
);
+ assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS +
".checkpoints.internal");
+
// Ensure no offset-syncs topics have been created on the primary
cluster
Set<String> primaryTopics =
primary.kafka().createAdminClient().listTopics().names().get();
assertFalse(primaryTopics.contains("mm2-offset-syncs." +
PRIMARY_CLUSTER_ALIAS + ".internal"));
@@ -575,6 +581,17 @@ public class MirrorConnectorsIntegrationBaseTest {
// Send some records to test-topic-no-checkpoints in the source cluster
produceMessages(primary, "test-topic-no-checkpoints");
+ try (Consumer<byte[], byte[]> consumer =
primary.kafka().createConsumer(consumerProps)) {
+ Collection<TopicPartition> tps = Arrays.asList(tp1, tp2);
+ Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
endOffsets.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> new OffsetAndMetadata(e.getValue())
+ ));
+ consumer.commitSync(offsetsToCommit);
+ }
+
waitForCondition(() -> {
Map<TopicPartition, OffsetAndMetadata> translatedOffsets =
backupClient.remoteConsumerOffsets(
consumerGroupName, PRIMARY_CLUSTER_ALIAS,
Duration.ofSeconds(30L));
@@ -583,6 +600,33 @@ public class MirrorConnectorsIntegrationBaseTest {
}, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to
backup cluster");
}
+ @Test
+ public void testRestartReplication() throws InterruptedException {
+ String consumerGroupName = "consumer-group-restart";
+ Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
+ String remoteTopic = remoteTopicName("test-topic-1",
PRIMARY_CLUSTER_ALIAS);
+ warmUpConsumer(consumerProps);
+ mm2Props.put("sync.group.offsets.enabled", "true");
+ mm2Props.put("sync.group.offsets.interval.seconds", "1");
+ mm2Config = new MirrorMakerConfig(mm2Props);
+ waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config,
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+ produceMessages(primary, "test-topic-1");
+ try (Consumer<byte[], byte[]> primaryConsumer =
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+ waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
+ }
+ waitForConsumerGroupFullSync(backup,
Collections.singletonList(remoteTopic), consumerGroupName,
NUM_RECORDS_PRODUCED);
+ restartMirrorMakerConnectors(backup, CONNECTOR_LIST);
+ assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
+ Thread.sleep(5000);
+ produceMessages(primary, "test-topic-1");
+ try (Consumer<byte[], byte[]> primaryConsumer =
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+ waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
+ }
+ waitForConsumerGroupFullSync(backup,
Collections.singletonList(remoteTopic), consumerGroupName, 2 *
NUM_RECORDS_PRODUCED);
+ assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
+ }
+
+
private TopicPartition remoteTopicPartition(TopicPartition tp, String
alias) {
return new TopicPartition(remoteTopicName(tp.topic(), alias),
tp.partition());
}
@@ -645,6 +689,12 @@ public class MirrorConnectorsIntegrationBaseTest {
}
}
+ protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster
connectCluster, List<Class<? extends Connector>> connectorClasses) {
+ for (Class<? extends Connector> connector : connectorClasses) {
+ connectCluster.restartConnectorAndTasks(connector.getSimpleName(),
false, true, false);
+ }
+ }
+
/*
* wait for the topic created on the cluster
*/
@@ -690,7 +740,7 @@ public class MirrorConnectorsIntegrationBaseTest {
protected void produceMessages(EmbeddedConnectCluster cluster, String
topicName) {
Map<String, String> recordSent = generateRecords(NUM_RECORDS_PRODUCED);
for (Map.Entry<String, String> entry : recordSent.entrySet()) {
- cluster.kafka().produce(topicName, entry.getKey(),
entry.getValue());
+ produce(cluster.kafka(), topicName, null, entry.getKey(),
entry.getValue());
}
}
@@ -701,16 +751,70 @@ public class MirrorConnectorsIntegrationBaseTest {
int cnt = 0;
for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
for (int p = 0; p < numPartitions; p++)
- cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+ produce(cluster.kafka(), topicName, p, "key", "value-" +
cnt++);
}
-
+
+ /**
+ * Produce a test record to a Kafka cluster.
+ * This method allows subclasses to configure and use their own Kafka
Producer instead of using the built-in default.
+ * @param cluster Kafka cluster that should receive the record
+ * @param topic Topic to send the record to, non-null
+ * @param partition Partition to send the record to, maybe null.
+ * @param key Kafka key for the record
+ * @param value Kafka value for the record
+ */
+ protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer
partition, String key, String value) {
+ cluster.produce(topic, partition, key, value);
+ }
+
+ protected static Map<TopicPartition, OffsetAndMetadata>
waitForCheckpointOnAllPartitions(
+ MirrorClient client, String consumerGroupName, String
remoteClusterAlias, String topicName
+ ) throws InterruptedException {
+ AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new
AtomicReference<>();
+ waitForCondition(
+ () -> {
+ Map<TopicPartition, OffsetAndMetadata> offsets =
client.remoteConsumerOffsets(
+ consumerGroupName, remoteClusterAlias,
Duration.ofMillis(3000));
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ if (!offsets.containsKey(new TopicPartition(topicName,
i))) {
+ log.info("Checkpoint is missing for {}: {}-{}",
consumerGroupName, topicName, i);
+ return false;
+ }
+ }
+ ret.set(offsets);
+ return true;
+ },
+ CHECKPOINT_DURATION_MS,
+ String.format(
+ "Offsets for consumer group %s not translated from %s
for topic %s",
+ consumerGroupName,
+ remoteClusterAlias,
+ topicName
+ )
+ );
+ return ret.get();
+ }
+
/*
* given consumer group, topics and expected number of records, make sure
the consumer group
* offsets are eventually synced to the expected offset numbers
*/
- protected static <T> void
waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
- Consumer<T, T> consumer, List<String> topics, String
consumerGroupId, int numRecords, boolean exactOffsetTranslation)
- throws InterruptedException {
+ protected static <T> void waitForConsumerGroupFullSync(
+ EmbeddedConnectCluster connect, List<String> topics, String
consumerGroupId, int numRecords
+ ) throws InterruptedException {
+ int expectedRecords = numRecords * topics.size();
+ Map<String, Object> consumerProps = new HashMap<>();
+ consumerProps.put("isolation.level", "read_committed");
+ consumerProps.put("auto.offset.reset", "earliest");
+ Map<TopicPartition, Long> lastOffset = new HashMap<>();
+ try (Consumer<byte[], byte[]> consumer =
connect.kafka().createConsumerAndSubscribeTo(consumerProps, topics.toArray(new
String[0]))) {
+ final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+ waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ records.forEach(record -> lastOffset.put(new
TopicPartition(record.topic(), record.partition()), record.offset()));
+ return expectedRecords ==
totalConsumedRecords.addAndGet(records.count());
+ }, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all
records in time");
+ }
try (Admin adminClient = connect.kafka().createAdminClient()) {
List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS *
topics.size());
for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS;
partitionIndex++) {
@@ -718,26 +822,54 @@ public class MirrorConnectorsIntegrationBaseTest {
tps.add(new TopicPartition(topic, partitionIndex));
}
}
- long expectedTotalOffsets = numRecords * topics.size();
waitForCondition(() -> {
Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets =
-
adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
- long consumerGroupOffsetTotal =
consumerGroupOffsets.values().stream()
- .mapToLong(OffsetAndMetadata::offset).sum();
-
- Map<TopicPartition, Long> offsets = consumer.endOffsets(tps,
CONSUMER_POLL_TIMEOUT_MS);
- long totalOffsets = offsets.values().stream().mapToLong(l ->
l).sum();
-
- boolean totalOffsetsMatch = exactOffsetTranslation
- ? totalOffsets == expectedTotalOffsets
- : totalOffsets >= expectedTotalOffsets;
- // make sure the consumer group offsets are synced to expected
number
- return totalOffsetsMatch && consumerGroupOffsetTotal > 0;
+
adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
+ Map<TopicPartition, OffsetSpec> endOffsetRequest = tps.stream()
+ .collect(Collectors.toMap(Function.identity(), ignored
-> OffsetSpec.latest()));
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
endOffsets =
+ adminClient.listOffsets(endOffsetRequest).all().get();
+
+ for (TopicPartition tp : tps) {
+ assertTrue(consumerGroupOffsets.containsKey(tp),
+ "TopicPartition " + tp + " does not have
translated offsets");
+ assertTrue(consumerGroupOffsets.get(tp).offset() >
lastOffset.get(tp),
+ "TopicPartition " + tp + " does not have
fully-translated offsets");
+ assertTrue(consumerGroupOffsets.get(tp).offset() <=
endOffsets.get(tp).offset(),
+ "TopicPartition " + tp + " has downstream offsets
beyond the log end, this would lead to negative lag metrics");
+ }
+ return true;
}, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not
complete in time");
}
}
+ protected static void assertMonotonicCheckpoints(EmbeddedConnectCluster
cluster, String checkpointTopic) {
+ TopicPartition checkpointTopicPartition = new
TopicPartition(checkpointTopic, 0);
+ try (Consumer<byte[], byte[]> backupConsumer =
cluster.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+ "auto.offset.reset", "earliest"), checkpointTopic)) {
+ Map<String, Map<TopicPartition, Checkpoint>> checkpointsByGroup =
new HashMap<>();
+ long deadline = System.currentTimeMillis() +
CHECKPOINT_DURATION_MS;
+ do {
+ ConsumerRecords<byte[], byte[]> records =
backupConsumer.poll(Duration.ofSeconds(1L));
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ Checkpoint checkpoint =
Checkpoint.deserializeRecord(record);
+ Map<TopicPartition, Checkpoint> lastCheckpoints =
checkpointsByGroup.computeIfAbsent(
+ checkpoint.consumerGroupId(),
+ ignored -> new HashMap<>());
+ Checkpoint lastCheckpoint =
lastCheckpoints.getOrDefault(checkpoint.topicPartition(), checkpoint);
+ assertTrue(checkpoint.downstreamOffset() >=
lastCheckpoint.downstreamOffset(),
+ "Checkpoint was non-monotonic for "
+ + checkpoint.consumerGroupId()
+ + ": "
+ + checkpoint.topicPartition());
+ lastCheckpoints.put(checkpoint.topicPartition(),
checkpoint);
+ }
+ } while
(backupConsumer.currentLag(checkpointTopicPartition).orElse(1) > 0 &&
System.currentTimeMillis() < deadline);
+ assertEquals(0,
backupConsumer.currentLag(checkpointTopicPartition).orElse(1), "Unable to read
all checkpoints within " + CHECKPOINT_DURATION_MS + "ms");
+ }
+ }
+
/*
* make sure the consumer to consume expected number of records
*/
@@ -760,7 +892,7 @@ public class MirrorConnectorsIntegrationBaseTest {
mm2Props.put("max.tasks", "10");
mm2Props.put("groups", "consumer-group-.*");
mm2Props.put("sync.topic.acls.enabled", "false");
- mm2Props.put("emit.checkpoints.interval.seconds", "1");
+ mm2Props.put("emit.checkpoints.interval.seconds",
String.valueOf(CHECKPOINT_INTERVAL_DURATION_MS / 1000));
mm2Props.put("emit.heartbeats.interval.seconds", "1");
mm2Props.put("refresh.topics.interval.seconds", "1");
mm2Props.put("refresh.groups.interval.seconds", "1");
@@ -771,7 +903,10 @@ public class MirrorConnectorsIntegrationBaseTest {
mm2Props.put("offset.storage.replication.factor", "1");
mm2Props.put("status.storage.replication.factor", "1");
mm2Props.put("replication.factor", "1");
-
+ // Sync offsets as soon as possible to ensure the final record in a
finite test has its offset translated.
+ mm2Props.put("offset.lag.max", "0");
+ mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".offset.flush.interval.ms",
"5000");
+ mm2Props.put(BACKUP_CLUSTER_ALIAS + ".offset.flush.interval.ms",
"5000");
return mm2Props;
}
@@ -802,14 +937,14 @@ public class MirrorConnectorsIntegrationBaseTest {
* Generate some consumer activity on both clusters to ensure the
checkpoint connector always starts promptly
*/
protected void warmUpConsumer(Map<String, Object> consumerProps) {
- Consumer<byte[], byte[]> dummyConsumer =
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
- dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- dummyConsumer.commitSync();
- dummyConsumer.close();
- dummyConsumer =
backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
- dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- dummyConsumer.commitSync();
- dummyConsumer.close();
+ try (Consumer<byte[], byte[]> dummyConsumer =
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+ dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ dummyConsumer.commitSync();
+ }
+ try (Consumer<byte[], byte[]> dummyConsumer =
backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+ dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ dummyConsumer.commitSync();
+ }
}
/*
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
index a50b21bd58b..0081dcb3688 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
@@ -43,9 +43,6 @@ public class MirrorConnectorsIntegrationExactlyOnceTest
extends MirrorConnectors
brokerProps.put("transaction.state.log.replication.factor", "1");
brokerProps.put("transaction.state.log.min.isr", "1");
}
- // Transaction marker records will cause translated offsets to not
match
- // between source and target
- exactOffsetTranslation = false;
super.startClusters();
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
new file mode 100644
index 00000000000..6ac09ef32bb
--- /dev/null
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Integration test for MirrorMaker2 in which source records are emitted with
a transactional producer,
+ * which interleaves transaction commit messages into the source topic which
are not propagated downstream.
+ */
+public class MirrorConnectorsIntegrationTransactionsTest extends
MirrorConnectorsIntegrationBaseTest {
+
+ private Map<String, Object> producerProps = new HashMap<>();
+
+ @BeforeEach
+ @Override
+ public void startClusters() throws Exception {
+ primaryBrokerProps.put("transaction.state.log.replication.factor",
"1");
+ backupBrokerProps.put("transaction.state.log.replication.factor", "1");
+ primaryBrokerProps.put("transaction.state.log.min.isr", "1");
+ backupBrokerProps.put("transaction.state.log.min.isr", "1");
+ producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+ producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"embedded-kafka-0");
+ super.startClusters();
+ }
+
+ /**
+ * Produce records with a short-lived transactional producer to interleave
transaction markers in the topic.
+ */
+ @Override
+ protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer
partition, String key, String value) {
+ ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic,
partition, key == null ? null : key.getBytes(), value == null ? null :
value.getBytes());
+ try (Producer<byte[], byte[]> producer =
cluster.createProducer(producerProps)) {
+ producer.initTransactions();
+ producer.beginTransaction();
+ producer.send(msg).get(120000, TimeUnit.MILLISECONDS);
+ producer.commitTransaction();
+ } catch (Exception e) {
+ throw new KafkaException("Could not produce message: " + msg, e);
+ }
+ }
+}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 50c0e6936a1..e6d820539ea 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -257,8 +257,16 @@ public class KafkaBasedLog<K, V> {
" allotted period. This could indicate a connectivity
issue, unavailable topic partitions, or if" +
" this is your first use of the topic it may have taken
too long to create.");
- for (PartitionInfo partition : partitionInfos)
- partitions.add(new TopicPartition(partition.topic(),
partition.partition()));
+ for (PartitionInfo partition : partitionInfos) {
+ TopicPartition topicPartition = new
TopicPartition(partition.topic(), partition.partition());
+ if (readPartition(topicPartition)) {
+ partitions.add(topicPartition);
+ }
+ }
+ if (partitions.isEmpty()) {
+ throw new ConnectException("Some partitions for " + topic + "
exist, but no partitions matched the " +
+ "required filter.");
+ }
partitionCount = partitions.size();
consumer.assign(partitions);
@@ -392,6 +400,18 @@ public class KafkaBasedLog<K, V> {
return new KafkaConsumer<>(consumerConfigs);
}
+ /**
+ * Signals whether a topic partition should be read by this log. Invoked
on {@link #start() startup} once
+ * for every partition found in the log's backing topic.
+ * <p>This method can be overridden by subclasses when only a subset of
the assigned partitions
+ * should be read into memory. By default, all partitions are read.
+ * @param topicPartition A topic partition which could be read by this log.
+ * @return true if the partition should be read by this log, false if its
contents should be ignored.
+ */
+ protected boolean readPartition(TopicPartition topicPartition) {
+ return true;
+ }
+
private void poll(long timeoutMs) {
try {
ConsumerRecords<K, V> records =
consumer.poll(Duration.ofMillis(timeoutMs));
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index f9defc77ca2..691852430a0 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -282,8 +282,7 @@ public class TopicAdmin implements AutoCloseable {
this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
Admin.create(adminConfig));
}
- // visible for testing
- TopicAdmin(Object bootstrapServers, Admin adminClient) {
+ public TopicAdmin(Object bootstrapServers, Admin adminClient) {
this(bootstrapServers, adminClient, true);
}