This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new ad0eb3beead KAFKA-20040: Fix race on consumer updating positions and 
unsubscribe when using committed offsets with topic ID (#21253)
ad0eb3beead is described below

commit ad0eb3beead56bbd44b21c170061da3b176d4685
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Jan 6 17:12:54 2026 -0500

    KAFKA-20040: Fix race on consumer updating positions and unsubscribe when 
using committed offsets with topic ID (#21253)
    
    When using topic IDs for fetching committed offsets (as of 4.2), there
    could be a race where a consumer requests offsets to update positions,
    but unsubscribes from it before receiving a response. In that case, the
    response to the OffsetFetch will contain a topic ID that the consumer
    cannot resolve from metadata, failing with Topic does not exist.
    
    This PR changes to keep the names used when building the request,  to
    used them when parsing the response and not fail if they don't exist  in
    the consumer metadata anymore (consumer unsubscribed).
    
    Note that this relates only to the case of fetching offsets to update
    positions. The manual apis to fetch offsets are not affected by this
    issue, nor by the change in this PR. Calls to consumer.committed
    explicitly add topics to metadata as transient topics, so they are not
    removed from metadata until the response is received and processed.
    
    Reviewers: David Jacot <[email protected]>
---
 .../consumer/internals/CommitRequestManager.java   | 27 ++++++++++++++--------
 .../internals/CommitRequestManagerTest.java        | 24 +++++++++++++------
 2 files changed, 35 insertions(+), 16 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 092b67fdef9..f7f9a4be719 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -970,6 +970,13 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
          */
         public final Set<TopicPartition> requestedPartitions;
 
+        /**
+         * Map of topic ID to topic names for the topics included in a request 
when using topic IDs.
+         * To be used when parsing the response, as topics may be removed from 
the consumer
+         * metadata before receiving a response.
+         */
+        public Map<Uuid, String> topicNamesCache;
+
         /**
          * Future with the result of the request. This can be reset using 
{@link #resetFuture()}
          * to get a new result when the request is retried.
@@ -985,6 +992,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 retryBackoffMaxMs, memberInfo, deadlineTimer(time, 
deadlineMs));
             this.requestedPartitions = partitions;
             this.future = new CompletableFuture<>();
+            this.topicNamesCache = new HashMap<>();
         }
 
         public OffsetFetchRequestState(final Set<TopicPartition> partitions,
@@ -997,6 +1005,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 retryBackoffMaxMs, jitter, memberInfo, deadlineTimer(time, 
deadlineMs));
             this.requestedPartitions = partitions;
             this.future = new CompletableFuture<>();
+            this.topicNamesCache = new HashMap<>();
         }
 
         public boolean sameRequest(final OffsetFetchRequestState request) {
@@ -1005,6 +1014,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
 
         public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
             Map<String, Uuid> topicIds = metadata.topicIds();
+            topicNamesCache.clear();
             boolean canUseTopicIds = true;
             List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = new 
ArrayList<>();
             Map<String, List<TopicPartition>> tps = 
requestedPartitions.stream().collect(Collectors.groupingBy(TopicPartition::topic));
@@ -1013,9 +1023,12 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 Uuid topicId = topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
                 if (Uuid.ZERO_UUID.equals(topicId)) {
                     canUseTopicIds = false;
+                } else {
+                    // Save topicId-topicName ref to be used when parsing the 
response
+                    topicNamesCache.put(topicId, topic);
                 }
                 topics.add(new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
-                    .setName(entry.getKey())
+                    .setName(topic)
                     .setTopicId(topicId)
                     .setPartitionIndexes(entry.getValue().stream()
                         .map(TopicPartition::partition)
@@ -1124,26 +1137,22 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
 
             for (var topic : response.topics()) {
                 // If the topic id is used, the topic name is empty in the 
response.
-                String topicName = topic.name().isEmpty() ? 
metadata.topicNames().get(topic.topicId()) : topic.name();
+                String topicName = Uuid.ZERO_UUID.equals(topic.topicId()) ? 
topic.name() : topicNamesCache.get(topic.topicId());
                 for (var partition : topic.partitions()) {
                     var tp = new TopicPartition(
                         topicName,
                         partition.partitionIndex()
                     );
                     var error = Errors.forCode(partition.errorCode());
-                    if (error != Errors.NONE || topicName == null) {
-                        if (error != Errors.NONE) {
-                            log.debug("Failed to fetch offset for partition 
{}: {}", tp, error.message());
-                        } else { // unknown topic name
-                            log.debug("Failed to fetch offset, topic does not 
exist");
-                        }
+                    if (error != Errors.NONE) {
+                        log.debug("Failed to fetch offset for partition {}: 
{}", tp, error.message());
 
                         if (!failedRequestRegistered) {
                             onFailedAttempt(currentTimeMs);
                             failedRequestRegistered = true;
                         }
 
-                        if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION || 
error == Errors.UNKNOWN_TOPIC_ID || topicName == null) {
+                        if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION || 
error == Errors.UNKNOWN_TOPIC_ID) {
                             future.completeExceptionally(new 
KafkaException("Topic does not exist"));
                             return;
                         } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) 
{
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 9829cf8f548..540c2902384 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -88,7 +88,6 @@ import static 
org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -778,14 +777,17 @@ public class CommitRequestManagerTest {
         assertEmptyPendingRequests(commitRequestManager);
     }
 
+    // Ensure that OffsetFetch requests using topic ID do not fail when 
receiving topic IDs in response
+    // that are not in the client metadata anymore. This is the case when a 
consumer unsubscribes from a topic
+    // right after sending an OffsetFetch request. The response will contain a 
topic ID that does not exist in
+    // the consumer metadata anymore. The expectation is that the response 
parsing succeeds,
+    // ignoring the offsets for the topic that the consumer no longer needs.
     @Test
-    public void 
testOffsetFetchRequestShouldFailWithTopicIdWhenMetadataUnknownResponseTopicId() 
{
+    public void testFetchOffsetsWithTopicIdsDoesNotFailOnUnsubscribedTopics() {
         CommitRequestManager commitRequestManager = create(true, 100);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
         Uuid topicId = Uuid.randomUuid();
         when(metadata.topicIds()).thenReturn(Map.of("t1", topicId));
-        // Mock the scenario where the topicID from the response is not in the 
metadata.
-        when(metadata.topicNames()).thenReturn(Map.of());
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(new TopicPartition("t1", 0));
 
@@ -798,10 +800,18 @@ public class CommitRequestManagerTest {
             topicId);
         futures.forEach(f -> {
             assertTrue(f.isDone());
-            assertTrue(f.isCompletedExceptionally());
-            ExecutionException exception = 
assertThrows(ExecutionException.class, f::get);
-            assertInstanceOf(KafkaException.class, exception.getCause());
+            assertFalse(f.isCompletedExceptionally());
+            try {
+                // The topic received in response should be included in the 
result even
+                // if it's not in the consumer metadata anymore.
+                assertTrue(f.get().containsKey(new TopicPartition("t1", 0)));
+            } catch (InterruptedException | ExecutionException e) {
+                fail();
+            }
         });
+        // Names should be retrieved from the internal cache when parsing the 
response (not from the consumer metadata)
+        verify(metadata, never()).topicNames();
+
         // expecting the buffers to be emptied after being completed 
successfully
         commitRequestManager.poll(0);
         assertEmptyPendingRequests(commitRequestManager);

Reply via email to