This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cdd19a5326a KAFKA-12635: Don't emit checkpoints for partitions without 
offset-syncs (#11748)
cdd19a5326a is described below

commit cdd19a5326aae83d470eb8eab800b3e9ed21f013
Author: Mickael Maison <[email protected]>
AuthorDate: Mon May 16 17:44:14 2022 +0200

    KAFKA-12635: Don't emit checkpoints for partitions without offset-syncs 
(#11748)
    
    
    Reviewers: Luke Chen <[email protected]>,  Viktor Somogyi-Vass 
<[email protected]>, Dániel Urbán <[email protected]>, Federico 
Valeri <[email protected]>
---
 .../kafka/connect/mirror/MirrorCheckpointTask.java |  25 +++--
 .../kafka/connect/mirror/OffsetSyncStore.java      |  25 +++--
 .../connect/mirror/MirrorCheckpointTaskTest.java   |  27 +++++-
 .../kafka/connect/mirror/OffsetSyncStoreTest.java  |  10 +-
 .../IdentityReplicationIntegrationTest.java        |   2 +-
 .../MirrorConnectorsIntegrationBaseTest.java       | 105 ++++++++++++++++-----
 .../MirrorConnectorsIntegrationTest.java           |  23 -----
 7 files changed, 141 insertions(+), 76 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 30fb695d926..959961812ea 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -37,11 +37,14 @@ import java.util.Map.Entry;
 import java.util.Map;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.Collections;
 import java.util.stream.Collectors;
 import java.util.concurrent.ExecutionException;
 import java.time.Duration;
+import java.util.stream.Stream;
 
 /** Emits checkpoints for upstream consumer groups. */
 public class MirrorCheckpointTask extends SourceTask {
@@ -169,6 +172,7 @@ public class MirrorCheckpointTask extends SourceTask {
         return listConsumerGroupOffsets(group).entrySet().stream()
             .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
             .map(x -> checkpoint(group, x.getKey(), x.getValue()))
+            .flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do 
not emit checkpoints for partitions that don't have offset-syncs
             .filter(x -> x.downstreamOffset() >= 0)  // ignore offsets we 
cannot translate accurately
             .collect(Collectors.toList());
     }
@@ -182,12 +186,16 @@ public class MirrorCheckpointTask extends SourceTask {
         return 
sourceAdminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get();
     }
 
-    Checkpoint checkpoint(String group, TopicPartition topicPartition,
-            OffsetAndMetadata offsetAndMetadata) {
+    Optional<Checkpoint> checkpoint(String group, TopicPartition 
topicPartition,
+                                    OffsetAndMetadata offsetAndMetadata) {
         long upstreamOffset = offsetAndMetadata.offset();
-        long downstreamOffset = 
offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
-        return new Checkpoint(group, renameTopicPartition(topicPartition),
-            upstreamOffset, downstreamOffset, offsetAndMetadata.metadata());
+        OptionalLong downstreamOffset = 
offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
+        if (downstreamOffset.isPresent()) {
+            return Optional.of(new Checkpoint(group, 
renameTopicPartition(topicPartition),
+                    upstreamOffset, downstreamOffset.getAsLong(), 
offsetAndMetadata.metadata()));
+        } else {
+            return Optional.empty();
+        }
     }
 
     SourceRecord checkpointRecord(Checkpoint checkpoint, long timestamp) {
@@ -232,11 +240,10 @@ public class MirrorCheckpointTask extends SourceTask {
                 ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
                 // sync offset to the target cluster only if the state of 
current consumer group is:
                 // (1) idle: because the consumer at target is not actively 
consuming the mirrored topic
-                // (2) dead: the new consumer that is recently created at 
source and never exist at target
-                if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
+                // (2) dead: the new consumer that is recently created at 
source and never existed at target
+                if (consumerGroupState == ConsumerGroupState.EMPTY) {
                     idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
-                        
.partitionsToOffsetAndMetadata().get().entrySet().stream().collect(
-                            Collectors.toMap(Entry::getKey, Entry::getValue)));
+                        .partitionsToOffsetAndMetadata().get());
                 }
                 // new consumer upstream has state "DEAD" and will be 
identified during the offset sync-up
             } catch (InterruptedException | ExecutionException e) {
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
index 9152cd5aa0b..f9b6617c13d 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.HashMap;
 import java.util.Collections;
 import java.time.Duration;
+import java.util.Optional;
+import java.util.OptionalLong;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
 class OffsetSyncStore implements AutoCloseable {
@@ -47,14 +49,18 @@ class OffsetSyncStore implements AutoCloseable {
         this.offsetSyncTopicPartition = offsetSyncTopicPartition;
     }
 
-    long translateDownstream(TopicPartition sourceTopicPartition, long 
upstreamOffset) {
-        OffsetSync offsetSync = latestOffsetSync(sourceTopicPartition);
-        if (offsetSync.upstreamOffset() > upstreamOffset) {
-            // Offset is too far in the past to translate accurately
-            return -1;
+    OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long 
upstreamOffset) {
+        Optional<OffsetSync> offsetSync = 
latestOffsetSync(sourceTopicPartition);
+        if (offsetSync.isPresent()) {
+            if (offsetSync.get().upstreamOffset() > upstreamOffset) {
+                // Offset is too far in the past to translate accurately
+                return OptionalLong.of(-1L);
+            }
+            long upstreamStep = upstreamOffset - 
offsetSync.get().upstreamOffset();
+            return OptionalLong.of(offsetSync.get().downstreamOffset() + 
upstreamStep);
+        } else {
+            return OptionalLong.empty();
         }
-        long upstreamStep = upstreamOffset - offsetSync.upstreamOffset();
-        return offsetSync.downstreamOffset() + upstreamStep;
     }
 
     // poll and handle records
@@ -77,8 +83,7 @@ class OffsetSyncStore implements AutoCloseable {
         offsetSyncs.put(sourceTopicPartition, offsetSync);
     }
 
-    private OffsetSync latestOffsetSync(TopicPartition topicPartition) {
-        return offsetSyncs.computeIfAbsent(topicPartition, x -> new 
OffsetSync(topicPartition,
-            -1, -1));
+    private Optional<OffsetSync> latestOffsetSync(TopicPartition 
topicPartition) {
+        return Optional.ofNullable(offsetSyncs.get(topicPartition));
     }
 }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 7ef878ab2e8..54fe678e73a 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -21,6 +21,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Collections;
+import java.util.Optional;
+
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -28,6 +30,8 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class MirrorCheckpointTaskTest {
 
@@ -53,8 +57,10 @@ public class MirrorCheckpointTaskTest {
             new DefaultReplicationPolicy(), offsetSyncStore, 
Collections.emptyMap(), Collections.emptyMap());
         offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
         offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
-        Checkpoint checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new 
TopicPartition("topic1", 2),
+        Optional<Checkpoint> optionalCheckpoint1 = 
mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
             new OffsetAndMetadata(10, null));
+        assertTrue(optionalCheckpoint1.isPresent());
+        Checkpoint checkpoint1 = optionalCheckpoint1.get();
         SourceRecord sourceRecord1 = 
mirrorCheckpointTask.checkpointRecord(checkpoint1, 123L);
         assertEquals(new TopicPartition("source1.topic1", 2), 
checkpoint1.topicPartition(),
                 "checkpoint group9 source1.topic1 failed");
@@ -68,8 +74,10 @@ public class MirrorCheckpointTaskTest {
                 "checkpoint group9 downstreamOffset failed");
         assertEquals(123L, sourceRecord1.timestamp().longValue(),
                 "checkpoint group9 timestamp failed");
-        Checkpoint checkpoint2 = mirrorCheckpointTask.checkpoint("group11", 
new TopicPartition("target2.topic5", 6),
+        Optional<Checkpoint> optionalCheckpoint2 = 
mirrorCheckpointTask.checkpoint("group11", new TopicPartition("target2.topic5", 
6),
             new OffsetAndMetadata(12, null));
+        assertTrue(optionalCheckpoint2.isPresent());
+        Checkpoint checkpoint2 = optionalCheckpoint2.get();
         SourceRecord sourceRecord2 = 
mirrorCheckpointTask.checkpointRecord(checkpoint2, 234L);
         assertEquals(new TopicPartition("topic5", 6), 
checkpoint2.topicPartition(),
                 "checkpoint group11 topic5 failed");
@@ -138,4 +146,19 @@ public class MirrorCheckpointTaskTest {
         assertEquals(51, output.get(consumer2).get(t2p0).offset(),
                 "Consumer 2 " + topic2 + " failed");
     }
+
+    @Test
+    public void testNoCheckpointForTopicWithoutOffsetSyncs() {
+        OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new 
OffsetSyncStoreTest.FakeOffsetSyncStore();
+        MirrorCheckpointTask mirrorCheckpointTask = new 
MirrorCheckpointTask("source1", "target2",
+                new DefaultReplicationPolicy(), offsetSyncStore, 
Collections.emptyMap(), Collections.emptyMap());
+        offsetSyncStore.sync(new TopicPartition("topic1", 0), 3L, 4L);
+
+        Optional<Checkpoint> checkpoint1 = 
mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 1),
+                new OffsetAndMetadata(10, null));
+        Optional<Checkpoint> checkpoint2 = 
mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 0),
+                new OffsetAndMetadata(10, null));
+        assertFalse(checkpoint1.isPresent());
+        assertTrue(checkpoint2.isPresent());
+    }
 }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
index 9307c608865..9224a088081 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
@@ -47,26 +47,26 @@ public class OffsetSyncStoreTest {
         FakeOffsetSyncStore store = new FakeOffsetSyncStore();
 
         store.sync(tp, 100, 200);
-        assertEquals(store.translateDownstream(tp, 150), 250,
+        assertEquals(250L, store.translateDownstream(tp, 150).getAsLong(),
                 "Failure in translating downstream offset 250");
 
         // Translate exact offsets
         store.sync(tp, 150, 251);
-        assertEquals(store.translateDownstream(tp, 150), 251,
+        assertEquals(251L, store.translateDownstream(tp, 150).getAsLong(),
                 "Failure in translating exact downstream offset 251");
 
         // Use old offset (5) prior to any sync -> can't translate
-        assertEquals(-1, store.translateDownstream(tp, 5),
+        assertEquals(-1, store.translateDownstream(tp, 5).getAsLong(),
                 "Expected old offset to not translate");
 
         // Downstream offsets reset
         store.sync(tp, 200, 10);
-        assertEquals(store.translateDownstream(tp, 200), 10,
+        assertEquals(10L, store.translateDownstream(tp, 200).getAsLong(),
                 "Failure in resetting translation of downstream offset");
 
         // Upstream offsets reset
         store.sync(tp, 20, 20);
-        assertEquals(store.translateDownstream(tp, 20), 20,
+        assertEquals(20L, store.translateDownstream(tp, 20).getAsLong(),
                 "Failure in resetting translation of upstream offset");
     }
 }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index 9e60e4880dc..56ae3f8ebf9 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -266,7 +266,7 @@ public class IdentityReplicationIntegrationTest extends 
MirrorConnectorsIntegrat
      * Returns expected topic name on target cluster.
      */
     @Override
-    String backupClusterTopicName(String topic) {
+    String remoteTopicName(String topic, String clusterAlias) {
         return topic;
     }
 }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index f325e15695b..dfafdcbd8c6 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -41,7 +41,6 @@ import org.apache.kafka.connect.mirror.ReplicationPolicy;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
 import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
-import static org.apache.kafka.test.TestUtils.waitForCondition;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -55,11 +54,13 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.junit.jupiter.api.Tag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -80,7 +81,7 @@ import static 
org.apache.kafka.connect.mirror.TestUtils.generateRecords;
  * between clusters during this failover and failback.
  */
 @Tag("integration")
-public abstract class MirrorConnectorsIntegrationBaseTest {
+public class MirrorConnectorsIntegrationBaseTest {
     private static final Logger log = 
LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class);
     
     protected static final int NUM_RECORDS_PER_PARTITION = 10;
@@ -93,11 +94,13 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
     private static final int TOPIC_SYNC_DURATION_MS = 60_000;
     private static final int REQUEST_TIMEOUT_DURATION_MS = 60_000;
     private static final int NUM_WORKERS = 3;
-    protected static final Duration CONSUMER_POLL_TIMEOUT_MS = 
Duration.ofMillis(500);
+    protected static final Duration CONSUMER_POLL_TIMEOUT_MS = 
Duration.ofMillis(500L);
     protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
     protected static final String BACKUP_CLUSTER_ALIAS = "backup";
-    protected static final List<Class<? extends Connector>> CONNECTOR_LIST =
-            Arrays.asList(MirrorSourceConnector.class, 
MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class);
+    protected static final List<Class<? extends Connector>> CONNECTOR_LIST = 
Arrays.asList(
+            MirrorSourceConnector.class,
+            MirrorCheckpointConnector.class,
+            MirrorHeartbeatConnector.class);
 
     private volatile boolean shuttingDown;
     protected Map<String, String> mm2Props = new HashMap<>();
@@ -243,10 +246,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
         produceMessages(primary, "test-topic-1");
         produceMessages(backup, "test-topic-1");
         String consumerGroupName = "consumer-group-testReplication";
-        Map<String, Object> consumerProps = new HashMap<String, Object>() {{
-                put("group.id", consumerGroupName);
-                put("auto.offset.reset", "latest");
-            }};
+        Map<String, Object> consumerProps = 
Collections.singletonMap("group.id", consumerGroupName);
         // warm up consumers before starting the connectors so we don't need 
to wait for discovery
         warmUpConsumer(consumerProps);
         
@@ -319,9 +319,6 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
         waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
             Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated downstream to primary cluster.");
 
-        waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
-            Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated upstream to primary cluster.");
-
         Map<TopicPartition, OffsetAndMetadata> primaryOffsets = 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
  
@@ -329,17 +326,14 @@ public abstract class MirrorConnectorsIntegrationBaseTest 
{
         backupClient.close();
         
         // Failback consumer group to primary cluster
-        try (Consumer<byte[], byte[]> backupConsumer = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
consumerGroupName))) {
-            backupConsumer.assign(primaryOffsets.keySet());
-            primaryOffsets.forEach(backupConsumer::seek);
-            backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-            backupConsumer.commitAsync();
-        
-            assertTrue(backupConsumer.position(new 
TopicPartition("test-topic-1", 0)) > 0, "Consumer failedback to zero upstream 
offset.");
-            assertTrue(backupConsumer.position(new 
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero 
downstream offset.");
-            assertTrue(backupConsumer.position(
-                new TopicPartition("test-topic-1", 0)) <= 
NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected upstream offset.");
-            assertTrue(backupConsumer.position(
+        try (Consumer<byte[], byte[]> primaryConsumer = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
consumerGroupName))) {
+            primaryConsumer.assign(primaryOffsets.keySet());
+            primaryOffsets.forEach(primaryConsumer::seek);
+            primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            primaryConsumer.commitAsync();
+
+            assertTrue(primaryConsumer.position(new 
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero 
downstream offset.");
+            assertTrue(primaryConsumer.position(
                 new TopicPartition("backup.test-topic-1", 0)) <= 
NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected downstream offset.");
         }
       
@@ -526,6 +520,64 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
         assertFalse(primaryTopics.contains("mm2-offset-syncs." + 
BACKUP_CLUSTER_ALIAS + ".internal"));
     }
 
+    @Test
+    public void testNoCheckpointsIfNoRecordsAreMirrored() throws 
InterruptedException {
+        String consumerGroupName = "consumer-group-no-checkpoints";
+        Map<String, Object> consumerProps = 
Collections.singletonMap("group.id", consumerGroupName);
+
+        // ensure there are some records in the topic on the source cluster
+        produceMessages(primary, "test-topic-1");
+
+        // warm up consumers before starting the connectors, so we don't need 
to wait for discovery
+        warmUpConsumer(consumerProps);
+
+        // one way replication from primary to backup
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + 
".enabled", "false");
+        mm2Config = new MirrorMakerConfig(mm2Props);
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // make sure the topics  are created in the backup cluster
+        waitForTopicCreated(backup, remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS));
+        waitForTopicCreated(backup, 
remoteTopicName("test-topic-no-checkpoints", PRIMARY_CLUSTER_ALIAS));
+
+        // commit some offsets for both topics in the source cluster
+        TopicPartition tp1 = new TopicPartition("test-topic-1", 0);
+        TopicPartition tp2 = new TopicPartition("test-topic-no-checkpoints", 
0);
+        try (Consumer<byte[], byte[]> consumer = 
primary.kafka().createConsumer(consumerProps)) {
+            Collection<TopicPartition> tps = Arrays.asList(tp1, tp2);
+            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
+            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
endOffsets.entrySet().stream()
+                            .collect(Collectors.toMap(
+                                    Map.Entry::getKey,
+                                    e -> new OffsetAndMetadata(e.getValue())
+                            ));
+            consumer.commitSync(offsetsToCommit);
+        }
+
+        // Only test-topic-1 should have translated offsets because we've not 
yet mirrored any records for test-topic-no-checkpoints
+        MirrorClient backupClient = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
+        waitForCondition(() -> {
+            Map<TopicPartition, OffsetAndMetadata> translatedOffsets = 
backupClient.remoteConsumerOffsets(
+                    consumerGroupName, PRIMARY_CLUSTER_ALIAS, 
Duration.ofSeconds(30L));
+            return translatedOffsets.containsKey(remoteTopicPartition(tp1, 
PRIMARY_CLUSTER_ALIAS)) &&
+                   !translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));
+        }, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to 
backup cluster");
+
+        // Send some records to test-topic-no-checkpoints in the source cluster
+        produceMessages(primary, "test-topic-no-checkpoints");
+
+        waitForCondition(() -> {
+            Map<TopicPartition, OffsetAndMetadata> translatedOffsets = 
backupClient.remoteConsumerOffsets(
+                    consumerGroupName, PRIMARY_CLUSTER_ALIAS, 
Duration.ofSeconds(30L));
+            return translatedOffsets.containsKey(remoteTopicPartition(tp1, 
PRIMARY_CLUSTER_ALIAS)) &&
+                   translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));
+        }, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to 
backup cluster");
+    }
+
+    private TopicPartition remoteTopicPartition(TopicPartition tp, String 
alias) {
+        return new TopicPartition(remoteTopicName(tp.topic(), alias), 
tp.partition());
+    }
+
     /*
      * Run tests for Exclude Filter for copying topic configurations
      */
@@ -536,7 +588,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
         topicConfig.put("retention.bytes", "1000"); // should be included, 
default value is -1
 
         final String topic = "test-topic-with-config";
-        final String backupTopic = backupClusterTopicName(topic);
+        final String backupTopic = remoteTopicName(topic, 
PRIMARY_CLUSTER_ALIAS);
 
         primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
         waitForTopicCreated(backup, backupTopic);
@@ -560,8 +612,8 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
     /*
      * Returns expected topic name on target cluster.
      */
-    String backupClusterTopicName(String topic) {
-        return PRIMARY_CLUSTER_ALIAS + "." + topic;
+    String remoteTopicName(String topic, String clusterAlias) {
+        return clusterAlias + "." + topic;
     }
 
     /*
@@ -721,6 +773,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
         primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1, 
topicConfig, adminClientConfig);
         primary.kafka().createTopic("backup.test-topic-1", 1, 1, emptyMap, 
adminClientConfig);
         primary.kafka().createTopic("heartbeats", 1, 1, emptyMap, 
adminClientConfig);
+        primary.kafka().createTopic("test-topic-no-checkpoints", 1, 1, 
emptyMap, adminClientConfig);
         backup.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1, 
emptyMap, adminClientConfig);
         backup.kafka().createTopic("primary.test-topic-1", 1, 1, emptyMap, 
adminClientConfig);
         backup.kafka().createTopic("heartbeats", 1, 1, emptyMap, 
adminClientConfig);
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java
deleted file mode 100644
index ed82aa97ed4..00000000000
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.mirror.integration;
-
-import org.junit.jupiter.api.Tag;
-
-@Tag("integration")
-public class MirrorConnectorsIntegrationTest extends 
MirrorConnectorsIntegrationBaseTest {
-}

Reply via email to