This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new da596f6e2e6 KAFKA-15283:[1/N] Client support for OffsetCommit with
topic ID (#19577)
da596f6e2e6 is described below
commit da596f6e2e6decc1ad7fb4c378b26f59e89b6b65
Author: Lan Ding <[email protected]>
AuthorDate: Thu Nov 13 00:03:49 2025 +0800
KAFKA-15283:[1/N] Client support for OffsetCommit with topic ID (#19577)
Introduce Topic ID in existing OffsetCommit API, the patch only contains
the client side changes.
Reviewers: David Jacot <[email protected]>, Lianet Magrans
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../consumer/internals/CommitRequestManager.java | 31 ++++++--
.../kafka/clients/producer/internals/Sender.java | 2 +-
.../internals/CommitRequestManagerTest.java | 85 +++++++++++++++++++++-
3 files changed, 110 insertions(+), 8 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 6aae084fd47..3a66c9fe137 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -24,6 +24,7 @@ import
org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import
org.apache.kafka.clients.consumer.internals.metrics.OffsetCommitMetricsManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
@@ -708,15 +709,22 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
}
public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
+ Map<String, Uuid> topicIds = metadata.topicIds();
+ boolean canUseTopicIds = true;
Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic>
requestTopicDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry :
offsets.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetAndMetadata offsetAndMetadata = entry.getValue();
+ Uuid topicId = topicIds.getOrDefault(topicPartition.topic(),
Uuid.ZERO_UUID);
+ if (topicId.equals(Uuid.ZERO_UUID)) {
+ canUseTopicIds = false;
+ }
OffsetCommitRequestData.OffsetCommitRequestTopic topic =
requestTopicDataMap
.getOrDefault(topicPartition.topic(),
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName(topicPartition.topic())
+ .setTopicId(topicId)
);
topic.partitions().add(new
OffsetCommitRequestData.OffsetCommitRequestPartition()
@@ -740,7 +748,9 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
lastEpochSentOnCommit = Optional.empty();
}
- OffsetCommitRequest.Builder builder =
OffsetCommitRequest.Builder.forTopicNames(data);
+ OffsetCommitRequest.Builder builder = canUseTopicIds
+ ? OffsetCommitRequest.Builder.forTopicIdsOrNames(data,
true)
+ : OffsetCommitRequest.Builder.forTopicNames(data);
return buildRequestWithResponseHandling(builder);
}
@@ -752,6 +762,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
* - fail the future with a non-recoverable KafkaException for all
unexpected errors (even if retriable)
*/
@Override
+ @SuppressWarnings("NPathComplexity")
public void onResponse(final ClientResponse response) {
metricsManager.recordRequestLatency(response.requestLatencyMs());
long currentTimeMs = response.receivedTimeMs();
@@ -760,13 +771,22 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
boolean failedRequestRegistered = false;
for (OffsetCommitResponseData.OffsetCommitResponseTopic topic :
commitResponse.data().topics()) {
for (OffsetCommitResponseData.OffsetCommitResponsePartition
partition : topic.partitions()) {
- TopicPartition tp = new TopicPartition(topic.name(),
partition.partitionIndex());
+ // Version 10 drops topic name, and supports topic id.
+ // We need to find offsetAndMetadata based on topic id and
partition index only as
+ // topic name in the response will be emtpy.
+ // For older versions, topic id is zero, and we will find
the offsetAndMetadata based on the topic name.
+ TopicPartition tp =
(!Uuid.ZERO_UUID.equals(topic.topicId()) &&
metadata.topicNames().containsKey(topic.topicId())) ?
+ new
TopicPartition(metadata.topicNames().get(topic.topicId()),
partition.partitionIndex()) :
+ new TopicPartition(topic.name(),
partition.partitionIndex());
Errors error = Errors.forCode(partition.errorCode());
if (error == Errors.NONE) {
OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
- long offset = offsetAndMetadata.offset();
- log.debug("OffsetCommit completed successfully for
offset {} partition {}", offset, tp);
+ if (offsetAndMetadata == null) {
+ log.debug("Can't find metadata for partition {}",
tp);
+ } else {
+ log.debug("OffsetCommit completed successfully for
offset {} partition {}", offsetAndMetadata.offset(), tp);
+ }
continue;
}
@@ -789,7 +809,8 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
future.completeExceptionally(error.exception());
return;
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS ||
- error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+ error == Errors.UNKNOWN_TOPIC_OR_PARTITION ||
+ error == Errors.UNKNOWN_TOPIC_ID) {
// just retry
future.completeExceptionally(error.exception());
return;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 64e8646d6f1..983c5c9ba48 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -607,7 +607,7 @@ public class Sender implements Runnable {
p.errorMessage(),
p.currentLeader());
- // Version 13 drop topic name and add support to topic id.
+ // Version 13 drops topic name, and supports topic id.
// We need to find batch based on topic id and partition
index only as
// topic name in the response will be empty.
// For older versions, topic id is zero, and we will find
the batch based on the topic name.
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index afbb81eb53f..accaace1355 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -85,6 +85,7 @@ import static
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -469,6 +470,62 @@ public class CommitRequestManagerTest {
assertFutureThrows(CommitFailedException.class, commitResult);
}
+ @Test
+ public void testCommitSyncShouldSucceedWithTopicId() {
+ subscriptionState = mock(SubscriptionState.class);
+ TopicPartition tp = new TopicPartition("topic", 1);
+ Uuid topicId = Uuid.randomUuid();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+ when(metadata.topicIds()).thenReturn(Map.of("topic", topicId));
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0,
Optional.of(1), "");
+ Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
+
+ CommitRequestManager commitRequestManager = create(false, 100);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitSync(
+ offsets, time.milliseconds() + defaultApiTimeoutMs);
+ assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
+ List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(true, 1, commitRequestManager, true);
+ pollResults.forEach(v ->
v.onComplete(mockOffsetCommitResponseWithTopicId(
+ topicId,
+ 1,
+ (short) 10,
+ Errors.NONE)));
+
+ verify(subscriptionState, never()).allConsumed();
+ verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
+ assertTrue(future.isDone());
+ assertEquals(offsets, commitOffsets);
+ }
+
+ @Test
+ public void testCommitSyncShouldSucceedWithUnknownOffsetAndMetadata() {
+ subscriptionState = mock(SubscriptionState.class);
+ Uuid topicId = Uuid.randomUuid();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+ when(metadata.topicIds()).thenReturn(Map.of("topic", topicId));
+ Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(
+ new TopicPartition("foo", 1),
+ new OffsetAndMetadata(0));
+
+ CommitRequestManager commitRequestManager = create(false, 100);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitSync(
+ offsets, time.milliseconds() + defaultApiTimeoutMs);
+ assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
+ List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
+ pollResults.forEach(v ->
v.onComplete(mockOffsetCommitResponseWithTopicId(
+ topicId,
+ 1,
+ (short) 10,
+ Errors.NONE)));
+
+ verify(subscriptionState, never()).allConsumed();
+ verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt());
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
+ assertTrue(future.isDone());
+ assertEquals(offsets, commitOffsets);
+ }
+
/**
* This is the case of the async auto commit request triggered on the
interval. The request
* internally fails with the fatal stale epoch error, and the expectation
is that it just
@@ -1270,6 +1327,7 @@ public class CommitRequestManagerTest {
long commitReceivedTimeMs = time.milliseconds();
res.unsentRequests.get(0).future().complete(mockOffsetCommitResponse(
topic,
+ Uuid.ZERO_UUID,
partition,
(short) 1,
commitCreationTimeMs,
@@ -1339,6 +1397,7 @@ public class CommitRequestManagerTest {
Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE,
TimeoutException.class),
Arguments.of(Errors.REQUEST_TIMED_OUT, TimeoutException.class),
Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION,
TimeoutException.class),
+ Arguments.of(Errors.UNKNOWN_TOPIC_ID, TimeoutException.class),
// Non-retriable errors should result in their specific exceptions
Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED,
GroupAuthorizationException.class),
@@ -1584,13 +1643,21 @@ public class CommitRequestManagerTest {
private List<NetworkClientDelegate.FutureCompletionHandler> assertPoll(
final int numRes,
final CommitRequestManager manager) {
- return assertPoll(true, numRes, manager);
+ return assertPoll(true, numRes, manager, false);
}
private List<NetworkClientDelegate.FutureCompletionHandler> assertPoll(
final boolean coordinatorDiscovered,
final int numRes,
final CommitRequestManager manager) {
+ return assertPoll(coordinatorDiscovered, numRes, manager, false);
+ }
+
+ private List<NetworkClientDelegate.FutureCompletionHandler> assertPoll(
+ final boolean coordinatorDiscovered,
+ final int numRes,
+ final CommitRequestManager manager,
+ final boolean shouldUseTopicIds) {
if (coordinatorDiscovered) {
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
} else {
@@ -1598,6 +1665,12 @@ public class CommitRequestManagerTest {
}
NetworkClientDelegate.PollResult res =
manager.poll(time.milliseconds());
assertEquals(numRes, res.unsentRequests.size());
+ if (shouldUseTopicIds) {
+ res.unsentRequests.stream()
+ .peek(req ->
assertTrue(req.requestBuilder().latestAllowedVersion() > 9))
+ .flatMap(request -> ((OffsetCommitRequestData)
request.requestBuilder().build().data()).topics().stream())
+ .forEach(topic -> assertNotEquals(Uuid.ZERO_UUID,
topic.topicId()));
+ }
return
res.unsentRequests.stream().map(NetworkClientDelegate.UnsentRequest::handler).collect(Collectors.toList());
}
@@ -1667,15 +1740,22 @@ public class CommitRequestManagerTest {
);
}
+ private ClientResponse mockOffsetCommitResponseWithTopicId(Uuid topicId,
+ int partition,
+ short
apiKeyVersion,
+ Errors error) {
+ return mockOffsetCommitResponse("", topicId, partition, apiKeyVersion,
time.milliseconds(), time.milliseconds(), error);
+ }
private ClientResponse mockOffsetCommitResponse(String topic,
int partition,
short apiKeyVersion,
Errors error) {
- return mockOffsetCommitResponse(topic, partition, apiKeyVersion,
time.milliseconds(), time.milliseconds(), error);
+ return mockOffsetCommitResponse(topic, Uuid.ZERO_UUID, partition,
apiKeyVersion, time.milliseconds(), time.milliseconds(), error);
}
private ClientResponse mockOffsetCommitResponse(String topic,
+ Uuid topicId,
int partition,
short apiKeyVersion,
long createdTimeMs,
@@ -1685,6 +1765,7 @@ public class CommitRequestManagerTest {
.setTopics(Collections.singletonList(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setName(topic)
+ .setTopicId(topicId)
.setPartitions(Collections.singletonList(
new
OffsetCommitResponseData.OffsetCommitResponsePartition()
.setErrorCode(error.code())