This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cdd19a5326a KAFKA-12635: Don't emit checkpoints for partitions without
offset-syncs (#11748)
cdd19a5326a is described below
commit cdd19a5326aae83d470eb8eab800b3e9ed21f013
Author: Mickael Maison <[email protected]>
AuthorDate: Mon May 16 17:44:14 2022 +0200
KAFKA-12635: Don't emit checkpoints for partitions without offset-syncs
(#11748)
Reviewers: Luke Chen <[email protected]>, Viktor Somogyi-Vass
<[email protected]>, Dániel Urbán <[email protected]>, Federico
Valeri <[email protected]>
---
.../kafka/connect/mirror/MirrorCheckpointTask.java | 25 +++--
.../kafka/connect/mirror/OffsetSyncStore.java | 25 +++--
.../connect/mirror/MirrorCheckpointTaskTest.java | 27 +++++-
.../kafka/connect/mirror/OffsetSyncStoreTest.java | 10 +-
.../IdentityReplicationIntegrationTest.java | 2 +-
.../MirrorConnectorsIntegrationBaseTest.java | 105 ++++++++++++++++-----
.../MirrorConnectorsIntegrationTest.java | 23 -----
7 files changed, 141 insertions(+), 76 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 30fb695d926..959961812ea 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -37,11 +37,14 @@ import java.util.Map.Entry;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
+import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.concurrent.ExecutionException;
import java.time.Duration;
+import java.util.stream.Stream;
/** Emits checkpoints for upstream consumer groups. */
public class MirrorCheckpointTask extends SourceTask {
@@ -169,6 +172,7 @@ public class MirrorCheckpointTask extends SourceTask {
return listConsumerGroupOffsets(group).entrySet().stream()
.filter(x -> shouldCheckpointTopic(x.getKey().topic()))
.map(x -> checkpoint(group, x.getKey(), x.getValue()))
+ .flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do
not emit checkpoints for partitions that don't have offset-syncs
.filter(x -> x.downstreamOffset() >= 0) // ignore offsets we
cannot translate accurately
.collect(Collectors.toList());
}
@@ -182,12 +186,16 @@ public class MirrorCheckpointTask extends SourceTask {
return
sourceAdminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get();
}
- Checkpoint checkpoint(String group, TopicPartition topicPartition,
- OffsetAndMetadata offsetAndMetadata) {
+ Optional<Checkpoint> checkpoint(String group, TopicPartition
topicPartition,
+ OffsetAndMetadata offsetAndMetadata) {
long upstreamOffset = offsetAndMetadata.offset();
- long downstreamOffset =
offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
- return new Checkpoint(group, renameTopicPartition(topicPartition),
- upstreamOffset, downstreamOffset, offsetAndMetadata.metadata());
+ OptionalLong downstreamOffset =
offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
+ if (downstreamOffset.isPresent()) {
+ return Optional.of(new Checkpoint(group,
renameTopicPartition(topicPartition),
+ upstreamOffset, downstreamOffset.getAsLong(),
offsetAndMetadata.metadata()));
+ } else {
+ return Optional.empty();
+ }
}
SourceRecord checkpointRecord(Checkpoint checkpoint, long timestamp) {
@@ -232,11 +240,10 @@ public class MirrorCheckpointTask extends SourceTask {
ConsumerGroupState consumerGroupState =
consumerGroupDesc.state();
// sync offset to the target cluster only if the state of
current consumer group is:
// (1) idle: because the consumer at target is not actively
consuming the mirrored topic
- // (2) dead: the new consumer that is recently created at
source and never exist at target
- if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
+ // (2) dead: the new consumer that is recently created at
source and never existed at target
+ if (consumerGroupState == ConsumerGroupState.EMPTY) {
idleConsumerGroupsOffset.put(group,
targetAdminClient.listConsumerGroupOffsets(group)
-
.partitionsToOffsetAndMetadata().get().entrySet().stream().collect(
- Collectors.toMap(Entry::getKey, Entry::getValue)));
+ .partitionsToOffsetAndMetadata().get());
}
// new consumer upstream has state "DEAD" and will be
identified during the offset sync-up
} catch (InterruptedException | ExecutionException e) {
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
index 9152cd5aa0b..f9b6617c13d 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.HashMap;
import java.util.Collections;
import java.time.Duration;
+import java.util.Optional;
+import java.util.OptionalLong;
/** Used internally by MirrorMaker. Stores offset syncs and performs offset
translation. */
class OffsetSyncStore implements AutoCloseable {
@@ -47,14 +49,18 @@ class OffsetSyncStore implements AutoCloseable {
this.offsetSyncTopicPartition = offsetSyncTopicPartition;
}
- long translateDownstream(TopicPartition sourceTopicPartition, long
upstreamOffset) {
- OffsetSync offsetSync = latestOffsetSync(sourceTopicPartition);
- if (offsetSync.upstreamOffset() > upstreamOffset) {
- // Offset is too far in the past to translate accurately
- return -1;
+ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long
upstreamOffset) {
+ Optional<OffsetSync> offsetSync =
latestOffsetSync(sourceTopicPartition);
+ if (offsetSync.isPresent()) {
+ if (offsetSync.get().upstreamOffset() > upstreamOffset) {
+ // Offset is too far in the past to translate accurately
+ return OptionalLong.of(-1L);
+ }
+ long upstreamStep = upstreamOffset -
offsetSync.get().upstreamOffset();
+ return OptionalLong.of(offsetSync.get().downstreamOffset() +
upstreamStep);
+ } else {
+ return OptionalLong.empty();
}
- long upstreamStep = upstreamOffset - offsetSync.upstreamOffset();
- return offsetSync.downstreamOffset() + upstreamStep;
}
// poll and handle records
@@ -77,8 +83,7 @@ class OffsetSyncStore implements AutoCloseable {
offsetSyncs.put(sourceTopicPartition, offsetSync);
}
- private OffsetSync latestOffsetSync(TopicPartition topicPartition) {
- return offsetSyncs.computeIfAbsent(topicPartition, x -> new
OffsetSync(topicPartition,
- -1, -1));
+ private Optional<OffsetSync> latestOffsetSync(TopicPartition
topicPartition) {
+ return Optional.ofNullable(offsetSyncs.get(topicPartition));
}
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 7ef878ab2e8..54fe678e73a 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -21,6 +21,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Collections;
+import java.util.Optional;
+
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.connect.source.SourceRecord;
@@ -28,6 +30,8 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class MirrorCheckpointTaskTest {
@@ -53,8 +57,10 @@ public class MirrorCheckpointTaskTest {
new DefaultReplicationPolicy(), offsetSyncStore,
Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
- Checkpoint checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new
TopicPartition("topic1", 2),
+ Optional<Checkpoint> optionalCheckpoint1 =
mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
new OffsetAndMetadata(10, null));
+ assertTrue(optionalCheckpoint1.isPresent());
+ Checkpoint checkpoint1 = optionalCheckpoint1.get();
SourceRecord sourceRecord1 =
mirrorCheckpointTask.checkpointRecord(checkpoint1, 123L);
assertEquals(new TopicPartition("source1.topic1", 2),
checkpoint1.topicPartition(),
"checkpoint group9 source1.topic1 failed");
@@ -68,8 +74,10 @@ public class MirrorCheckpointTaskTest {
"checkpoint group9 downstreamOffset failed");
assertEquals(123L, sourceRecord1.timestamp().longValue(),
"checkpoint group9 timestamp failed");
- Checkpoint checkpoint2 = mirrorCheckpointTask.checkpoint("group11",
new TopicPartition("target2.topic5", 6),
+ Optional<Checkpoint> optionalCheckpoint2 =
mirrorCheckpointTask.checkpoint("group11", new TopicPartition("target2.topic5",
6),
new OffsetAndMetadata(12, null));
+ assertTrue(optionalCheckpoint2.isPresent());
+ Checkpoint checkpoint2 = optionalCheckpoint2.get();
SourceRecord sourceRecord2 =
mirrorCheckpointTask.checkpointRecord(checkpoint2, 234L);
assertEquals(new TopicPartition("topic5", 6),
checkpoint2.topicPartition(),
"checkpoint group11 topic5 failed");
@@ -138,4 +146,19 @@ public class MirrorCheckpointTaskTest {
assertEquals(51, output.get(consumer2).get(t2p0).offset(),
"Consumer 2 " + topic2 + " failed");
}
+
+ @Test
+ public void testNoCheckpointForTopicWithoutOffsetSyncs() {
+ OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new
OffsetSyncStoreTest.FakeOffsetSyncStore();
+ MirrorCheckpointTask mirrorCheckpointTask = new
MirrorCheckpointTask("source1", "target2",
+ new DefaultReplicationPolicy(), offsetSyncStore,
Collections.emptyMap(), Collections.emptyMap());
+ offsetSyncStore.sync(new TopicPartition("topic1", 0), 3L, 4L);
+
+ Optional<Checkpoint> checkpoint1 =
mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 1),
+ new OffsetAndMetadata(10, null));
+ Optional<Checkpoint> checkpoint2 =
mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 0),
+ new OffsetAndMetadata(10, null));
+ assertFalse(checkpoint1.isPresent());
+ assertTrue(checkpoint2.isPresent());
+ }
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
index 9307c608865..9224a088081 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
@@ -47,26 +47,26 @@ public class OffsetSyncStoreTest {
FakeOffsetSyncStore store = new FakeOffsetSyncStore();
store.sync(tp, 100, 200);
- assertEquals(store.translateDownstream(tp, 150), 250,
+ assertEquals(250L, store.translateDownstream(tp, 150).getAsLong(),
"Failure in translating downstream offset 250");
// Translate exact offsets
store.sync(tp, 150, 251);
- assertEquals(store.translateDownstream(tp, 150), 251,
+ assertEquals(251L, store.translateDownstream(tp, 150).getAsLong(),
"Failure in translating exact downstream offset 251");
// Use old offset (5) prior to any sync -> can't translate
- assertEquals(-1, store.translateDownstream(tp, 5),
+ assertEquals(-1, store.translateDownstream(tp, 5).getAsLong(),
"Expected old offset to not translate");
// Downstream offsets reset
store.sync(tp, 200, 10);
- assertEquals(store.translateDownstream(tp, 200), 10,
+ assertEquals(10L, store.translateDownstream(tp, 200).getAsLong(),
"Failure in resetting translation of downstream offset");
// Upstream offsets reset
store.sync(tp, 20, 20);
- assertEquals(store.translateDownstream(tp, 20), 20,
+ assertEquals(20L, store.translateDownstream(tp, 20).getAsLong(),
"Failure in resetting translation of upstream offset");
}
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index 9e60e4880dc..56ae3f8ebf9 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -266,7 +266,7 @@ public class IdentityReplicationIntegrationTest extends
MirrorConnectorsIntegrat
* Returns expected topic name on target cluster.
*/
@Override
- String backupClusterTopicName(String topic) {
+ String remoteTopicName(String topic, String clusterAlias) {
return topic;
}
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index f325e15695b..dfafdcbd8c6 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -41,7 +41,6 @@ import org.apache.kafka.connect.mirror.ReplicationPolicy;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
-import static org.apache.kafka.test.TestUtils.waitForCondition;
import java.time.Duration;
import java.util.ArrayList;
@@ -55,11 +54,13 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.junit.jupiter.api.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -80,7 +81,7 @@ import static
org.apache.kafka.connect.mirror.TestUtils.generateRecords;
* between clusters during this failover and failback.
*/
@Tag("integration")
-public abstract class MirrorConnectorsIntegrationBaseTest {
+public class MirrorConnectorsIntegrationBaseTest {
private static final Logger log =
LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class);
protected static final int NUM_RECORDS_PER_PARTITION = 10;
@@ -93,11 +94,13 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
private static final int TOPIC_SYNC_DURATION_MS = 60_000;
private static final int REQUEST_TIMEOUT_DURATION_MS = 60_000;
private static final int NUM_WORKERS = 3;
- protected static final Duration CONSUMER_POLL_TIMEOUT_MS =
Duration.ofMillis(500);
+ protected static final Duration CONSUMER_POLL_TIMEOUT_MS =
Duration.ofMillis(500L);
protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
protected static final String BACKUP_CLUSTER_ALIAS = "backup";
- protected static final List<Class<? extends Connector>> CONNECTOR_LIST =
- Arrays.asList(MirrorSourceConnector.class,
MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class);
+ protected static final List<Class<? extends Connector>> CONNECTOR_LIST =
Arrays.asList(
+ MirrorSourceConnector.class,
+ MirrorCheckpointConnector.class,
+ MirrorHeartbeatConnector.class);
private volatile boolean shuttingDown;
protected Map<String, String> mm2Props = new HashMap<>();
@@ -243,10 +246,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
produceMessages(primary, "test-topic-1");
produceMessages(backup, "test-topic-1");
String consumerGroupName = "consumer-group-testReplication";
- Map<String, Object> consumerProps = new HashMap<String, Object>() {{
- put("group.id", consumerGroupName);
- put("auto.offset.reset", "latest");
- }};
+ Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
// warm up consumers before starting the connectors so we don't need
to wait for discovery
warmUpConsumer(consumerProps);
@@ -319,9 +319,6 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
waitForCondition(() ->
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not
translated downstream to primary cluster.");
- waitForCondition(() ->
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
- Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new
TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not
translated upstream to primary cluster.");
-
Map<TopicPartition, OffsetAndMetadata> primaryOffsets =
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
Duration.ofMillis(CHECKPOINT_DURATION_MS));
@@ -329,17 +326,14 @@ public abstract class MirrorConnectorsIntegrationBaseTest
{
backupClient.close();
// Failback consumer group to primary cluster
- try (Consumer<byte[], byte[]> backupConsumer =
primary.kafka().createConsumer(Collections.singletonMap("group.id",
consumerGroupName))) {
- backupConsumer.assign(primaryOffsets.keySet());
- primaryOffsets.forEach(backupConsumer::seek);
- backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- backupConsumer.commitAsync();
-
- assertTrue(backupConsumer.position(new
TopicPartition("test-topic-1", 0)) > 0, "Consumer failedback to zero upstream
offset.");
- assertTrue(backupConsumer.position(new
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero
downstream offset.");
- assertTrue(backupConsumer.position(
- new TopicPartition("test-topic-1", 0)) <=
NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected upstream offset.");
- assertTrue(backupConsumer.position(
+ try (Consumer<byte[], byte[]> primaryConsumer =
primary.kafka().createConsumer(Collections.singletonMap("group.id",
consumerGroupName))) {
+ primaryConsumer.assign(primaryOffsets.keySet());
+ primaryOffsets.forEach(primaryConsumer::seek);
+ primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ primaryConsumer.commitAsync();
+
+ assertTrue(primaryConsumer.position(new
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero
downstream offset.");
+ assertTrue(primaryConsumer.position(
new TopicPartition("backup.test-topic-1", 0)) <=
NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected downstream offset.");
}
@@ -526,6 +520,64 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
assertFalse(primaryTopics.contains("mm2-offset-syncs." +
BACKUP_CLUSTER_ALIAS + ".internal"));
}
+ @Test
+ public void testNoCheckpointsIfNoRecordsAreMirrored() throws
InterruptedException {
+ String consumerGroupName = "consumer-group-no-checkpoints";
+ Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
+
+ // ensure there are some records in the topic on the source cluster
+ produceMessages(primary, "test-topic-1");
+
+ // warm up consumers before starting the connectors, so we don't need
to wait for discovery
+ warmUpConsumer(consumerProps);
+
+ // one way replication from primary to backup
+ mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS +
".enabled", "false");
+ mm2Config = new MirrorMakerConfig(mm2Props);
+ waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config,
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+ // make sure the topics are created in the backup cluster
+ waitForTopicCreated(backup, remoteTopicName("test-topic-1",
PRIMARY_CLUSTER_ALIAS));
+ waitForTopicCreated(backup,
remoteTopicName("test-topic-no-checkpoints", PRIMARY_CLUSTER_ALIAS));
+
+ // commit some offsets for both topics in the source cluster
+ TopicPartition tp1 = new TopicPartition("test-topic-1", 0);
+ TopicPartition tp2 = new TopicPartition("test-topic-no-checkpoints",
0);
+ try (Consumer<byte[], byte[]> consumer =
primary.kafka().createConsumer(consumerProps)) {
+ Collection<TopicPartition> tps = Arrays.asList(tp1, tp2);
+ Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
endOffsets.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> new OffsetAndMetadata(e.getValue())
+ ));
+ consumer.commitSync(offsetsToCommit);
+ }
+
+ // Only test-topic-1 should have translated offsets because we've not
yet mirrored any records for test-topic-no-checkpoints
+ MirrorClient backupClient = new
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
+ waitForCondition(() -> {
+ Map<TopicPartition, OffsetAndMetadata> translatedOffsets =
backupClient.remoteConsumerOffsets(
+ consumerGroupName, PRIMARY_CLUSTER_ALIAS,
Duration.ofSeconds(30L));
+ return translatedOffsets.containsKey(remoteTopicPartition(tp1,
PRIMARY_CLUSTER_ALIAS)) &&
+ !translatedOffsets.containsKey(remoteTopicPartition(tp2,
PRIMARY_CLUSTER_ALIAS));
+ }, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to
backup cluster");
+
+ // Send some records to test-topic-no-checkpoints in the source cluster
+ produceMessages(primary, "test-topic-no-checkpoints");
+
+ waitForCondition(() -> {
+ Map<TopicPartition, OffsetAndMetadata> translatedOffsets =
backupClient.remoteConsumerOffsets(
+ consumerGroupName, PRIMARY_CLUSTER_ALIAS,
Duration.ofSeconds(30L));
+ return translatedOffsets.containsKey(remoteTopicPartition(tp1,
PRIMARY_CLUSTER_ALIAS)) &&
+ translatedOffsets.containsKey(remoteTopicPartition(tp2,
PRIMARY_CLUSTER_ALIAS));
+ }, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to
backup cluster");
+ }
+
+ private TopicPartition remoteTopicPartition(TopicPartition tp, String
alias) {
+ return new TopicPartition(remoteTopicName(tp.topic(), alias),
tp.partition());
+ }
+
/*
* Run tests for Exclude Filter for copying topic configurations
*/
@@ -536,7 +588,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
topicConfig.put("retention.bytes", "1000"); // should be included,
default value is -1
final String topic = "test-topic-with-config";
- final String backupTopic = backupClusterTopicName(topic);
+ final String backupTopic = remoteTopicName(topic,
PRIMARY_CLUSTER_ALIAS);
primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
waitForTopicCreated(backup, backupTopic);
@@ -560,8 +612,8 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
/*
* Returns expected topic name on target cluster.
*/
- String backupClusterTopicName(String topic) {
- return PRIMARY_CLUSTER_ALIAS + "." + topic;
+ String remoteTopicName(String topic, String clusterAlias) {
+ return clusterAlias + "." + topic;
}
/*
@@ -721,6 +773,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1,
topicConfig, adminClientConfig);
primary.kafka().createTopic("backup.test-topic-1", 1, 1, emptyMap,
adminClientConfig);
primary.kafka().createTopic("heartbeats", 1, 1, emptyMap,
adminClientConfig);
+ primary.kafka().createTopic("test-topic-no-checkpoints", 1, 1,
emptyMap, adminClientConfig);
backup.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1,
emptyMap, adminClientConfig);
backup.kafka().createTopic("primary.test-topic-1", 1, 1, emptyMap,
adminClientConfig);
backup.kafka().createTopic("heartbeats", 1, 1, emptyMap,
adminClientConfig);
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java
deleted file mode 100644
index ed82aa97ed4..00000000000
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.mirror.integration;
-
-import org.junit.jupiter.api.Tag;
-
-@Tag("integration")
-public class MirrorConnectorsIntegrationTest extends
MirrorConnectorsIntegrationBaseTest {
-}