This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new ad0eb3beead KAFKA-20040: Fix race on consumer updating positions and
unsubscribe when using committed offsets with topic ID (#21253)
ad0eb3beead is described below
commit ad0eb3beead56bbd44b21c170061da3b176d4685
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Jan 6 17:12:54 2026 -0500
KAFKA-20040: Fix race on consumer updating positions and unsubscribe when
using committed offsets with topic ID (#21253)
When using topic IDs for fetching committed offsets (as of 4.2), there
could be a race where a consumer requests offsets to update positions,
but unsubscribes from it before receiving a response. In that case, the
response to the OffsetFetch will contain a topic ID that the consumer
cannot resolve from metadata, failing with Topic does not exist.
This PR changes to keep the names used when building the request, to
used them when parsing the response and not fail if they don't exist in
the consumer metadata anymore (consumer unsubscribed).
Note that this relates only to the case of fetching offsets to update
positions. The manual apis to fetch offsets are not affected by this
issue, nor by the change in this PR. Calls to consumer.committed
explicitly add topics to metadata as transient topics, so they are not
removed from metadata until the response is received and processed.
Reviewers: David Jacot <[email protected]>
---
.../consumer/internals/CommitRequestManager.java | 27 ++++++++++++++--------
.../internals/CommitRequestManagerTest.java | 24 +++++++++++++------
2 files changed, 35 insertions(+), 16 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 092b67fdef9..f7f9a4be719 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -970,6 +970,13 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
*/
public final Set<TopicPartition> requestedPartitions;
+ /**
+ * Map of topic ID to topic names for the topics included in a request
when using topic IDs.
+ * To be used when parsing the response, as topics may be removed from
the consumer
+ * metadata before receiving a response.
+ */
+ public Map<Uuid, String> topicNamesCache;
+
/**
* Future with the result of the request. This can be reset using
{@link #resetFuture()}
* to get a new result when the request is retried.
@@ -985,6 +992,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
retryBackoffMaxMs, memberInfo, deadlineTimer(time,
deadlineMs));
this.requestedPartitions = partitions;
this.future = new CompletableFuture<>();
+ this.topicNamesCache = new HashMap<>();
}
public OffsetFetchRequestState(final Set<TopicPartition> partitions,
@@ -997,6 +1005,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
retryBackoffMaxMs, jitter, memberInfo, deadlineTimer(time,
deadlineMs));
this.requestedPartitions = partitions;
this.future = new CompletableFuture<>();
+ this.topicNamesCache = new HashMap<>();
}
public boolean sameRequest(final OffsetFetchRequestState request) {
@@ -1005,6 +1014,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
Map<String, Uuid> topicIds = metadata.topicIds();
+ topicNamesCache.clear();
boolean canUseTopicIds = true;
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = new
ArrayList<>();
Map<String, List<TopicPartition>> tps =
requestedPartitions.stream().collect(Collectors.groupingBy(TopicPartition::topic));
@@ -1013,9 +1023,12 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
Uuid topicId = topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
if (Uuid.ZERO_UUID.equals(topicId)) {
canUseTopicIds = false;
+ } else {
+ // Save topicId-topicName ref to be used when parsing the
response
+ topicNamesCache.put(topicId, topic);
}
topics.add(new
OffsetFetchRequestData.OffsetFetchRequestTopics()
- .setName(entry.getKey())
+ .setName(topic)
.setTopicId(topicId)
.setPartitionIndexes(entry.getValue().stream()
.map(TopicPartition::partition)
@@ -1124,26 +1137,22 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
for (var topic : response.topics()) {
// If the topic id is used, the topic name is empty in the
response.
- String topicName = topic.name().isEmpty() ?
metadata.topicNames().get(topic.topicId()) : topic.name();
+ String topicName = Uuid.ZERO_UUID.equals(topic.topicId()) ?
topic.name() : topicNamesCache.get(topic.topicId());
for (var partition : topic.partitions()) {
var tp = new TopicPartition(
topicName,
partition.partitionIndex()
);
var error = Errors.forCode(partition.errorCode());
- if (error != Errors.NONE || topicName == null) {
- if (error != Errors.NONE) {
- log.debug("Failed to fetch offset for partition
{}: {}", tp, error.message());
- } else { // unknown topic name
- log.debug("Failed to fetch offset, topic does not
exist");
- }
+ if (error != Errors.NONE) {
+ log.debug("Failed to fetch offset for partition {}:
{}", tp, error.message());
if (!failedRequestRegistered) {
onFailedAttempt(currentTimeMs);
failedRequestRegistered = true;
}
- if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION ||
error == Errors.UNKNOWN_TOPIC_ID || topicName == null) {
+ if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION ||
error == Errors.UNKNOWN_TOPIC_ID) {
future.completeExceptionally(new
KafkaException("Topic does not exist"));
return;
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
{
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 9829cf8f548..540c2902384 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -88,7 +88,6 @@ import static
org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -778,14 +777,17 @@ public class CommitRequestManagerTest {
assertEmptyPendingRequests(commitRequestManager);
}
+ // Ensure that OffsetFetch requests using topic ID do not fail when
receiving topic IDs in response
+ // that are not in the client metadata anymore. This is the case when a
consumer unsubscribes from a topic
+ // right after sending an OffsetFetch request. The response will contain a
topic ID that does not exist in
+ // the consumer metadata anymore. The expectation is that the response
parsing succeeds,
+ // ignoring the offsets for the topic that the consumer no longer needs.
@Test
- public void
testOffsetFetchRequestShouldFailWithTopicIdWhenMetadataUnknownResponseTopicId()
{
+ public void testFetchOffsetsWithTopicIdsDoesNotFailOnUnsubscribedTopics() {
CommitRequestManager commitRequestManager = create(true, 100);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
Uuid topicId = Uuid.randomUuid();
when(metadata.topicIds()).thenReturn(Map.of("t1", topicId));
- // Mock the scenario where the topicID from the response is not in the
metadata.
- when(metadata.topicNames()).thenReturn(Map.of());
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(new TopicPartition("t1", 0));
@@ -798,10 +800,18 @@ public class CommitRequestManagerTest {
topicId);
futures.forEach(f -> {
assertTrue(f.isDone());
- assertTrue(f.isCompletedExceptionally());
- ExecutionException exception =
assertThrows(ExecutionException.class, f::get);
- assertInstanceOf(KafkaException.class, exception.getCause());
+ assertFalse(f.isCompletedExceptionally());
+ try {
+ // The topic received in response should be included in the
result even
+ // if it's not in the consumer metadata anymore.
+ assertTrue(f.get().containsKey(new TopicPartition("t1", 0)));
+ } catch (InterruptedException | ExecutionException e) {
+ fail();
+ }
});
+ // Names should be retrieved from the internal cache when parsing the
response (not from the consumer metadata)
+ verify(metadata, never()).topicNames();
+
// expecting the buffers to be emptied after being completed
successfully
commitRequestManager.poll(0);
assertEmptyPendingRequests(commitRequestManager);