This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fdeabe552f1 KAFKA-20229: Batch offset translation in 
RemoteClusterUtils (#21591)
fdeabe552f1 is described below

commit fdeabe552f19cdc4d52d824a2d4594828cd5cf63
Author: Mickael Maison <[email protected]>
AuthorDate: Fri Feb 27 12:20:43 2026 +0100

    KAFKA-20229: Batch offset translation in RemoteClusterUtils (#21591)
    
    Implements [KIP-1239](https://cwiki.apache.org/confluence/x/h4HMFw)
    
    Reviewers: Viktor Somogyi-Vass <[email protected]>
---
 .../apache/kafka/connect/mirror/MirrorClient.java  |  55 ++++++++---
 .../kafka/connect/mirror/RemoteClusterUtils.java   |  17 ++++
 .../kafka/connect/mirror/MirrorClientTest.java     | 103 +++++++++++++++++++++
 3 files changed, 162 insertions(+), 13 deletions(-)

diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
index 06dec5b25ba..7275ce72020 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 /**
@@ -70,6 +71,11 @@ public class MirrorClient implements AutoCloseable {
         this.consumerConfig = consumerConfig;
     }
 
+    // for testing
+    Consumer<byte[], byte[]> consumer() {
+        return new KafkaConsumer<>(consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer());
+    }
+
     /**
      * Closes internal clients.
      */
@@ -147,24 +153,32 @@ public class MirrorClient implements AutoCloseable {
     }
 
     /**
-     * Translates a remote consumer group's offsets into corresponding local 
offsets. Topics are automatically
+     * Translates remote consumer groups' offsets into corresponding local 
offsets. Topics are automatically
      * renamed according to the ReplicationPolicy.
-     * @param consumerGroupId The group ID of remote consumer group
+     * @param consumerGroupPattern The regex pattern specifying the consumer 
groups to translate offsets for
      * @param remoteClusterAlias The alias of remote cluster
      * @param timeout The maximum time to block when consuming from the 
checkpoints topic
+     * @throws IllegalArgumentException If any of the arguments are null
      */
-    public Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String 
consumerGroupId,
-            String remoteClusterAlias, Duration timeout) {
+    public Map<String, Map<TopicPartition, OffsetAndMetadata>> 
remoteConsumerOffsets(Pattern consumerGroupPattern,
+             String remoteClusterAlias, Duration timeout) {
+        if (consumerGroupPattern == null) {
+            throw new IllegalArgumentException("`consumerGroupPattern` must 
not be null");
+        }
+        if (remoteClusterAlias == null) {
+            throw new IllegalArgumentException("`remoteClusterAlias` must not 
be null");
+        }
+        if (timeout == null) {
+            throw new IllegalArgumentException("`timeout` must not be null");
+        }
         long deadline = System.currentTimeMillis() + timeout.toMillis();
-        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = new 
HashMap<>();
 
-        try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerConfig,
-                new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+        try (Consumer<byte[], byte[]> consumer = consumer()) {
             // checkpoint topics are not "remote topics", as they are not 
replicated. So we don't need
             // to use ReplicationPolicy to create the checkpoint topic here.
             String checkpointTopic = 
replicationPolicy.checkpointsTopic(remoteClusterAlias);
-            List<TopicPartition> checkpointAssignment =
-                List.of(new TopicPartition(checkpointTopic, 0));
+            List<TopicPartition> checkpointAssignment = List.of(new 
TopicPartition(checkpointTopic, 0));
             consumer.assign(checkpointAssignment);
             consumer.seekToBeginning(checkpointAssignment);
             while (System.currentTimeMillis() < deadline && 
!endOfStream(consumer, checkpointAssignment)) {
@@ -172,20 +186,35 @@ public class MirrorClient implements AutoCloseable {
                 for (ConsumerRecord<byte[], byte[]> record : records) {
                     try {
                         Checkpoint checkpoint = 
Checkpoint.deserializeRecord(record);
-                        if 
(checkpoint.consumerGroupId().equals(consumerGroupId)) {
-                            offsets.put(checkpoint.topicPartition(), 
checkpoint.offsetAndMetadata());
+                        String consumerGroupId = checkpoint.consumerGroupId();
+                        if 
(consumerGroupPattern.matcher(consumerGroupId).matches()) {
+                            offsets.computeIfAbsent(consumerGroupId, k -> new 
HashMap<>())
+                                    .put(checkpoint.topicPartition(), 
checkpoint.offsetAndMetadata());
                         }
                     } catch (SchemaException e) {
                         log.info("Could not deserialize record. Skipping.", e);
                     }
                 }
             }
-            log.info("Consumed {} checkpoint records for {} from {}.", 
offsets.size(),
-                consumerGroupId, checkpointTopic);
+            log.info("Consumed {} checkpoint records from {}.", 
offsets.size(), checkpointTopic);
         }
         return offsets;
     }
 
+    /**
+     * Translates a remote consumer group's offsets into corresponding local 
offsets. Topics are automatically
+     * renamed according to the ReplicationPolicy.
+     * @param consumerGroupId The group ID of remote consumer group
+     * @param remoteClusterAlias The alias of remote cluster
+     * @param timeout The maximum time to block when consuming from the 
checkpoints topic
+     */
+    public Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String 
consumerGroupId,
+            String remoteClusterAlias, Duration timeout) {
+        Pattern consumerGroupPattern = 
Pattern.compile(Pattern.quote(consumerGroupId));
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = 
remoteConsumerOffsets(consumerGroupPattern, remoteClusterAlias, timeout);
+        return offsets.getOrDefault(consumerGroupId, new HashMap<>());
+    }
+
     Set<String> listTopics() throws InterruptedException {
         try {
             return adminClient.listTopics().names().get();
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
index a3eb778b7ee..470d027dff5 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
@@ -26,6 +26,7 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
 
 
 /**
@@ -100,4 +101,20 @@ public final class RemoteClusterUtils {
             return client.remoteConsumerOffsets(consumerGroupId, 
remoteClusterAlias, timeout);
         }
     }
+
+    /**
+     * Translates remote consumer groups' offsets into corresponding local 
offsets. Topics are automatically
+     *  renamed according to the configured {@link ReplicationPolicy}.
+     *  @param properties Map of properties to instantiate a {@link 
MirrorClient}
+     *  @param remoteClusterAlias The alias of the remote cluster
+     *  @param consumerGroupPattern The regex pattern specifying the consumer 
groups to translate offsets for
+     *  @param timeout The maximum time to block when consuming from the 
checkpoints topic
+     *  @throws IllegalArgumentException If any of the arguments are null
+     */
+    public static Map<String, Map<TopicPartition, OffsetAndMetadata>> 
translateOffsets(Map<String, Object> properties,
+            String remoteClusterAlias, Pattern consumerGroupPattern, Duration 
timeout) {
+        try (MirrorClient client = new MirrorClient(properties)) {
+            return client.remoteConsumerOffsets(consumerGroupPattern, 
remoteClusterAlias, timeout);
+        }
+    }
 }
diff --git 
a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
 
b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
index 5e99f6cd74e..f7b6bbb08c3 100644
--- 
a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
+++ 
b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
@@ -16,24 +16,36 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
 import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.TopicPartition;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class MirrorClientTest {
 
+    private static final String SOURCE = "source";
+
     private static class FakeMirrorClient extends MirrorClient {
 
         List<String> topics;
+        public MockConsumer<byte[], byte[]> consumer;
 
         FakeMirrorClient(List<String> topics) {
             this(new DefaultReplicationPolicy(), topics);
@@ -52,6 +64,15 @@ public class MirrorClientTest {
         protected Set<String> listTopics() {
             return new HashSet<>(topics);
         }
+
+        @Override
+        Consumer<byte[], byte[]> consumer() {
+            if (consumer == null) {
+                return super.consumer();
+            } else {
+                return consumer;
+            }
+        }
     }
 
     @Test
@@ -208,9 +229,91 @@ public class MirrorClientTest {
             .topicSource("backup.heartbeats"));
     }
 
+    @Test
+    public void testRemoteConsumerOffsetsIllegalArgs() {
+        FakeMirrorClient client = new FakeMirrorClient();
+        assertThrows(IllegalArgumentException.class, () -> 
client.remoteConsumerOffsets((Pattern) null, "", Duration.ofSeconds(1L)));
+        assertThrows(IllegalArgumentException.class, () -> 
client.remoteConsumerOffsets(Pattern.compile(""), null, 
Duration.ofSeconds(1L)));
+        assertThrows(IllegalArgumentException.class, () -> 
client.remoteConsumerOffsets(Pattern.compile(""), "", null));
+    }
+
+    @Test
+    public void testRemoteConsumerOffsets() {
+        String grp0 = "mygroup0";
+        String grp1 = "mygroup1";
+        FakeMirrorClient client = new FakeMirrorClient();
+        String checkpointTopic = 
client.replicationPolicy().checkpointsTopic(SOURCE);
+        TopicPartition checkpointTp = new TopicPartition(checkpointTopic, 0);
+
+        TopicPartition t0p0 = new TopicPartition("topic0", 0);
+        TopicPartition t0p1 = new TopicPartition("topic0", 1);
+
+        Checkpoint cp0 = new Checkpoint(grp0, t0p0, 1L, 1L, "cp0");
+        Checkpoint cp1 = new Checkpoint(grp0, t0p0, 2L, 2L, "cp1");
+        Checkpoint cp2 = new Checkpoint(grp0, t0p1, 3L, 3L, "cp2");
+        Checkpoint cp3 = new Checkpoint(grp1, t0p1, 4L, 4L, "cp3");
+
+        // Batch translation matches only mygroup0
+        client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = 
client.remoteConsumerOffsets(
+                Pattern.compile(grp0), SOURCE, Duration.ofSeconds(10L));
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedOffsets = 
Map.of(
+                grp0, Map.of(
+                        t0p0, cp1.offsetAndMetadata(),
+                        t0p1, cp2.offsetAndMetadata()
+                )
+        );
+        assertEquals(expectedOffsets, offsets);
+
+        // Batch translation matches all groups
+        client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
+        offsets = client.remoteConsumerOffsets(Pattern.compile(".*"), SOURCE, 
Duration.ofSeconds(10L));
+        expectedOffsets = Map.of(
+                grp0, Map.of(
+                        t0p0, cp1.offsetAndMetadata(),
+                        t0p1, cp2.offsetAndMetadata()
+                ),
+                grp1, Map.of(
+                        t0p1, cp3.offsetAndMetadata()
+                )
+        );
+        assertEquals(expectedOffsets, offsets);
+
+        // Batch translation matches nothing
+        client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
+        offsets = 
client.remoteConsumerOffsets(Pattern.compile("unknown-group"), SOURCE, 
Duration.ofSeconds(10L));
+        assertTrue(offsets.isEmpty());
+
+        // Translation for mygroup0
+        client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
+        Map<TopicPartition, OffsetAndMetadata> offsets2 = 
client.remoteConsumerOffsets(grp0, SOURCE, Duration.ofSeconds(10L));
+        Map<TopicPartition, OffsetAndMetadata> expectedOffsets2 = Map.of(
+                t0p0, cp1.offsetAndMetadata(),
+                t0p1, cp2.offsetAndMetadata()
+        );
+        assertEquals(expectedOffsets2, offsets2);
+
+        // Translation for unknown group
+        client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
+        offsets2 = client.remoteConsumerOffsets("unknown-group", SOURCE, 
Duration.ofSeconds(10L));
+        assertTrue(offsets2.isEmpty());
+    }
+
     private ReplicationPolicy identityReplicationPolicy(String source) {
         IdentityReplicationPolicy policy = new IdentityReplicationPolicy();
         
policy.configure(Map.of(IdentityReplicationPolicy.SOURCE_CLUSTER_ALIAS_CONFIG, 
source));
         return policy;
     }
+
+    private MockConsumer<byte[], byte[]> buildConsumer(TopicPartition 
checkpointTp, Checkpoint... checkpoints) {
+        MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(AutoOffsetResetStrategy.NONE.name());
+        consumer.updateBeginningOffsets(Map.of(checkpointTp, 0L));
+        consumer.assign(Set.of(checkpointTp));
+        for (int i = 0; i < checkpoints.length; i++) {
+            Checkpoint checkpoint = checkpoints[i];
+            consumer.addRecord(new ConsumerRecord<>(checkpointTp.topic(), 0, 
i, checkpoint.recordKey(), checkpoint.recordValue()));
+        }
+        consumer.updateEndOffsets(Map.of(checkpointTp, checkpoints.length - 
1L));
+        return consumer;
+    }
 }

Reply via email to