This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new da596f6e2e6 KAFKA-15283:[1/N] Client support for OffsetCommit with 
topic ID (#19577)
da596f6e2e6 is described below

commit da596f6e2e6decc1ad7fb4c378b26f59e89b6b65
Author: Lan Ding <[email protected]>
AuthorDate: Thu Nov 13 00:03:49 2025 +0800

    KAFKA-15283:[1/N] Client support for OffsetCommit with topic ID (#19577)
    
    Introduce Topic ID in existing OffsetCommit API, the patch only contains
    the client side changes.
    
    Reviewers: David Jacot <[email protected]>, Lianet Magrans
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../consumer/internals/CommitRequestManager.java   | 31 ++++++--
 .../kafka/clients/producer/internals/Sender.java   |  2 +-
 .../internals/CommitRequestManagerTest.java        | 85 +++++++++++++++++++++-
 3 files changed, 110 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 6aae084fd47..3a66c9fe137 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -24,6 +24,7 @@ import 
org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import 
org.apache.kafka.clients.consumer.internals.metrics.OffsetCommitMetricsManager;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.RetriableException;
@@ -708,15 +709,22 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         }
 
         public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
+            Map<String, Uuid> topicIds = metadata.topicIds();
+            boolean canUseTopicIds = true;
             Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> 
requestTopicDataMap = new HashMap<>();
             for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
offsets.entrySet()) {
                 TopicPartition topicPartition = entry.getKey();
                 OffsetAndMetadata offsetAndMetadata = entry.getValue();
+                Uuid topicId = topicIds.getOrDefault(topicPartition.topic(), 
Uuid.ZERO_UUID);
+                if (topicId.equals(Uuid.ZERO_UUID)) {
+                    canUseTopicIds = false;
+                }
 
                 OffsetCommitRequestData.OffsetCommitRequestTopic topic = 
requestTopicDataMap
                     .getOrDefault(topicPartition.topic(),
                         new OffsetCommitRequestData.OffsetCommitRequestTopic()
                             .setName(topicPartition.topic())
+                            .setTopicId(topicId)
                     );
 
                 topic.partitions().add(new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
@@ -740,7 +748,9 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 lastEpochSentOnCommit = Optional.empty();
             }
 
-            OffsetCommitRequest.Builder builder = 
OffsetCommitRequest.Builder.forTopicNames(data);
+            OffsetCommitRequest.Builder builder = canUseTopicIds
+                    ? OffsetCommitRequest.Builder.forTopicIdsOrNames(data, 
true)
+                    : OffsetCommitRequest.Builder.forTopicNames(data);
 
             return buildRequestWithResponseHandling(builder);
         }
@@ -752,6 +762,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
          *   - fail the future with a non-recoverable KafkaException for all 
unexpected errors (even if retriable)
          */
         @Override
+        @SuppressWarnings("NPathComplexity")
         public void onResponse(final ClientResponse response) {
             metricsManager.recordRequestLatency(response.requestLatencyMs());
             long currentTimeMs = response.receivedTimeMs();
@@ -760,13 +771,22 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             boolean failedRequestRegistered = false;
             for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
                 for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
-                    TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                    // Version 10 drops topic name, and supports topic id.
+                    // We need to find offsetAndMetadata based on topic id and 
partition index only as
+                    // topic name in the response will be emtpy.
+                    // For older versions, topic id is zero, and we will find 
the offsetAndMetadata based on the topic name.
+                    TopicPartition tp = 
(!Uuid.ZERO_UUID.equals(topic.topicId()) && 
metadata.topicNames().containsKey(topic.topicId())) ?
+                        new 
TopicPartition(metadata.topicNames().get(topic.topicId()), 
partition.partitionIndex()) :
+                        new TopicPartition(topic.name(), 
partition.partitionIndex());
 
                     Errors error = Errors.forCode(partition.errorCode());
                     if (error == Errors.NONE) {
                         OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
-                        long offset = offsetAndMetadata.offset();
-                        log.debug("OffsetCommit completed successfully for 
offset {} partition {}", offset, tp);
+                        if (offsetAndMetadata == null) {
+                            log.debug("Can't find metadata for partition {}", 
tp);
+                        } else {
+                            log.debug("OffsetCommit completed successfully for 
offset {} partition {}", offsetAndMetadata.offset(), tp);
+                        }
                         continue;
                     }
 
@@ -789,7 +809,8 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                         future.completeExceptionally(error.exception());
                         return;
                     } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS ||
-                        error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                        error == Errors.UNKNOWN_TOPIC_OR_PARTITION ||
+                        error == Errors.UNKNOWN_TOPIC_ID) {
                         // just retry
                         future.completeExceptionally(error.exception());
                         return;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 64e8646d6f1..983c5c9ba48 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -607,7 +607,7 @@ public class Sender implements Runnable {
                             p.errorMessage(),
                             p.currentLeader());
 
-                    // Version 13 drop topic name and add support to topic id.
+                    // Version 13 drops topic name, and supports topic id.
                     // We need to find batch based on topic id and partition 
index only as
                     // topic name in the response will be empty.
                     // For older versions, topic id is zero, and we will find 
the batch based on the topic name.
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index afbb81eb53f..accaace1355 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -85,6 +85,7 @@ import static 
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -469,6 +470,62 @@ public class CommitRequestManagerTest {
         assertFutureThrows(CommitFailedException.class, commitResult);
     }
 
+    @Test
+    public void testCommitSyncShouldSucceedWithTopicId() {
+        subscriptionState = mock(SubscriptionState.class);
+        TopicPartition tp = new TopicPartition("topic", 1);
+        Uuid topicId = Uuid.randomUuid();
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        when(metadata.topicIds()).thenReturn(Map.of("topic", topicId));
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, 
Optional.of(1), "");
+        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, 
offsetAndMetadata);
+
+        CommitRequestManager commitRequestManager = create(false, 100);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitSync(
+            offsets, time.milliseconds() + defaultApiTimeoutMs);
+        assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
+        List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(true, 1, commitRequestManager, true);
+        pollResults.forEach(v -> 
v.onComplete(mockOffsetCommitResponseWithTopicId(
+            topicId,
+            1,
+            (short) 10,
+            Errors.NONE)));
+
+        verify(subscriptionState, never()).allConsumed();
+        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
+        assertTrue(future.isDone());
+        assertEquals(offsets, commitOffsets);
+    }
+
+    @Test
+    public void testCommitSyncShouldSucceedWithUnknownOffsetAndMetadata() {
+        subscriptionState = mock(SubscriptionState.class);
+        Uuid topicId = Uuid.randomUuid();
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        when(metadata.topicIds()).thenReturn(Map.of("topic", topicId));
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(
+            new TopicPartition("foo", 1),
+            new OffsetAndMetadata(0));
+
+        CommitRequestManager commitRequestManager = create(false, 100);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitSync(
+            offsets, time.milliseconds() + defaultApiTimeoutMs);
+        assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
+        List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(1, commitRequestManager);
+        pollResults.forEach(v -> 
v.onComplete(mockOffsetCommitResponseWithTopicId(
+            topicId,
+            1,
+            (short) 10,
+            Errors.NONE)));
+
+        verify(subscriptionState, never()).allConsumed();
+        verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt());
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
+        assertTrue(future.isDone());
+        assertEquals(offsets, commitOffsets);
+    }
+
     /**
      * This is the case of the async auto commit request triggered on the 
interval. The request
      * internally fails with the fatal stale epoch error, and the expectation 
is that it just
@@ -1270,6 +1327,7 @@ public class CommitRequestManagerTest {
         long commitReceivedTimeMs = time.milliseconds();
         res.unsentRequests.get(0).future().complete(mockOffsetCommitResponse(
                 topic,
+                Uuid.ZERO_UUID,
                 partition,
                 (short) 1,
                 commitCreationTimeMs,
@@ -1339,6 +1397,7 @@ public class CommitRequestManagerTest {
             Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, 
TimeoutException.class),
             Arguments.of(Errors.REQUEST_TIMED_OUT, TimeoutException.class),
             Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
TimeoutException.class),
+            Arguments.of(Errors.UNKNOWN_TOPIC_ID, TimeoutException.class),
 
             // Non-retriable errors should result in their specific exceptions
             Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, 
GroupAuthorizationException.class),
@@ -1584,13 +1643,21 @@ public class CommitRequestManagerTest {
     private List<NetworkClientDelegate.FutureCompletionHandler> assertPoll(
         final int numRes,
         final CommitRequestManager manager) {
-        return assertPoll(true, numRes, manager);
+        return assertPoll(true, numRes, manager, false);
     }
 
     private List<NetworkClientDelegate.FutureCompletionHandler> assertPoll(
         final boolean coordinatorDiscovered,
         final int numRes,
         final CommitRequestManager manager) {
+        return assertPoll(coordinatorDiscovered, numRes, manager, false);
+    }
+
+    private List<NetworkClientDelegate.FutureCompletionHandler> assertPoll(
+        final boolean coordinatorDiscovered,
+        final int numRes,
+        final CommitRequestManager manager,
+        final boolean shouldUseTopicIds) {
         if (coordinatorDiscovered) {
             
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
         } else {
@@ -1598,6 +1665,12 @@ public class CommitRequestManagerTest {
         }
         NetworkClientDelegate.PollResult res = 
manager.poll(time.milliseconds());
         assertEquals(numRes, res.unsentRequests.size());
+        if (shouldUseTopicIds) {
+            res.unsentRequests.stream()
+                .peek(req -> 
assertTrue(req.requestBuilder().latestAllowedVersion() > 9))
+                .flatMap(request -> ((OffsetCommitRequestData) 
request.requestBuilder().build().data()).topics().stream())
+                .forEach(topic -> assertNotEquals(Uuid.ZERO_UUID, 
topic.topicId()));
+        }
 
         return 
res.unsentRequests.stream().map(NetworkClientDelegate.UnsentRequest::handler).collect(Collectors.toList());
     }
@@ -1667,15 +1740,22 @@ public class CommitRequestManagerTest {
         );
     }
 
+    private ClientResponse mockOffsetCommitResponseWithTopicId(Uuid topicId,
+                                                              int partition,
+                                                              short 
apiKeyVersion,
+                                                              Errors error) {
+        return mockOffsetCommitResponse("", topicId, partition, apiKeyVersion, 
time.milliseconds(), time.milliseconds(), error);
+    }
 
     private ClientResponse mockOffsetCommitResponse(String topic,
                                                    int partition,
                                                    short apiKeyVersion,
                                                    Errors error) {
-        return mockOffsetCommitResponse(topic, partition, apiKeyVersion, 
time.milliseconds(), time.milliseconds(), error);
+        return mockOffsetCommitResponse(topic, Uuid.ZERO_UUID, partition, 
apiKeyVersion, time.milliseconds(), time.milliseconds(), error);
     }
 
     private ClientResponse mockOffsetCommitResponse(String topic,
+                                                   Uuid topicId,
                                                    int partition,
                                                    short apiKeyVersion,
                                                    long createdTimeMs,
@@ -1685,6 +1765,7 @@ public class CommitRequestManagerTest {
             .setTopics(Collections.singletonList(
                 new OffsetCommitResponseData.OffsetCommitResponseTopic()
                     .setName(topic)
+                    .setTopicId(topicId)
                     .setPartitions(Collections.singletonList(
                         new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
                             .setErrorCode(error.code())

Reply via email to