This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 319dd61cb36 KAFKA-20444: [5/N] Resolve TxnOffsetCommit topic IDs in 
KafkaApis (KIP-1319) (#22238)
319dd61cb36 is described below

commit 319dd61cb36f85fc9352eb244a9987784d9e43eb
Author: David Jacot <[email protected]>
AuthorDate: Sun May 10 18:19:44 2026 +0200

    KAFKA-20444: [5/N] Resolve TxnOffsetCommit topic IDs in KafkaApis 
(KIP-1319) (#22238)
    
    This patch wires topic IDs in `KafkaApis.handleTxnOffsetCommitRequest`:
    - For v6+, resolve the topic name from the topic ID via the metadata
      cache. If the topic ID cannot be resolved, the topic is returned
      with `UNKNOWN_TOPIC_ID` for all its partitions.
    - For v0-5, resolve the topic ID from the topic name via the metadata
      cache so the group coordinator receives it too. If the topic name
      cannot be resolved, the topic is returned with
      `UNKNOWN_TOPIC_OR_PARTITION` for all its partitions.
    
    Reviewers: Sean Quah <[email protected]>
---
 .../common/requests/TxnOffsetCommitResponse.java   |   4 +
 core/src/main/scala/kafka/server/KafkaApis.scala   |  64 +++++---
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 168 +++++++++++++++------
 3 files changed, 177 insertions(+), 59 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index a9f4a580041..20de83f9750 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -51,6 +51,10 @@ import java.util.function.Function;
  */
 public class TxnOffsetCommitResponse extends AbstractResponse {
 
+    public static boolean useTopicIds(short version) {
+        return version >= 6;
+    }
+
     public static Builder newBuilder(boolean useTopicIds) {
         if (useTopicIds) {
             return new TopicIdBuilder();
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1cdb7d258d9..a214f3e7ed8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2047,6 +2047,18 @@ class KafkaApis(val requestChannel: RequestChannel,
       
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
+      val useTopicIds = 
TxnOffsetCommitResponse.useTopicIds(request.header.apiVersion)
+
+      if (useTopicIds) {
+        // For v6+, the request carries topic IDs. Resolve them to topic names 
via the
+        // metadata cache before authorization and partition validation.
+        txnOffsetCommitRequest.data.topics.forEach { topic =>
+          if (topic.topicId != Uuid.ZERO_UUID) {
+            metadataCache.getTopicName(topic.topicId).ifPresent(name => 
topic.setName(name))
+          }
+        }
+      }
+
       val authorizedTopics = authHelper.filterByAuthorized(
         request.context,
         READ,
@@ -2054,31 +2066,49 @@ class KafkaApis(val requestChannel: RequestChannel,
         txnOffsetCommitRequest.data.topics.asScala
       )(_.name)
 
-      val responseBuilder = TxnOffsetCommitResponse.newBuilder(false)
+      val responseBuilder = TxnOffsetCommitResponse.newBuilder(useTopicIds)
       val authorizedTopicCommittedOffsets = new 
mutable.ArrayBuffer[TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic]()
       txnOffsetCommitRequest.data.topics.forEach { topic =>
-        if (!authorizedTopics.contains(topic.name)) {
+        if (useTopicIds && topic.name.isEmpty) {
+          // If the topic name is undefined, the topic ID could not be 
resolved. We add
+          // the topic and all its partitions to the response with 
UNKNOWN_TOPIC_ID.
+          responseBuilder.addPartitions(
+            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_ID)
+        } else if (!authorizedTopics.contains(topic.name)) {
           // If the topic is not authorized, we add the topic and all its 
partitions
           // to the response with TOPIC_AUTHORIZATION_FAILED.
-          responseBuilder.addPartitions(topic.topicId, topic.name, 
topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
-        } else if (!metadataCache.contains(topic.name)) {
-          // If the topic is unknown, we add the topic and all its partitions
-          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
-          responseBuilder.addPartitions(topic.topicId, topic.name, 
topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          responseBuilder.addPartitions(
+            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
         } else {
-          // Otherwise, we check all partitions to ensure that they all exist.
-          val topicWithValidPartitions = new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName(topic.name)
+          // For lower API versions, the topic id is not included in the 
request. Resolve
+          // it from the metadata cache so the coordinator can use it. If the 
topic does
+          // not exist, the topic id falls back to ZERO_UUID.
+          if (!useTopicIds) {
+            topic.setTopicId(metadataCache.getTopicId(topic.name))
+          }
 
-          topic.partitions.forEach { partition =>
-            if (metadataCache.getLeaderAndIsr(topic.name, 
partition.partitionIndex).isPresent()) {
-              topicWithValidPartitions.partitions.add(partition)
-            } else {
-              responseBuilder.addPartition(topic.topicId, topic.name, 
partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          if (topic.topicId == Uuid.ZERO_UUID) {
+            // If the topic is unknown, we add the topic and all its partitions
+            // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+            responseBuilder.addPartitions(
+              Uuid.ZERO_UUID, topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          } else {
+            // Otherwise, we check all partitions to ensure that they all 
exist.
+            val topicWithValidPartitions = new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+              .setTopicId(topic.topicId)
+              .setName(topic.name)
+
+            topic.partitions.forEach { partition =>
+              if (metadataCache.getLeaderAndIsr(topic.name, 
partition.partitionIndex).isPresent()) {
+                topicWithValidPartitions.partitions.add(partition)
+              } else {
+                responseBuilder.addPartition(topic.topicId, topic.name, 
partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              }
             }
-          }
 
-          if (!topicWithValidPartitions.partitions.isEmpty) {
-            authorizedTopicCommittedOffsets += topicWithValidPartitions
+            if (!topicWithValidPartitions.partitions.isEmpty) {
+              authorizedTopicCommittedOffsets += topicWithValidPartitions
+            }
           }
         }
       }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f38a6146244..bcb6edb92a5 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1502,33 +1502,56 @@ class KafkaApisTest extends Logging {
     checkInvalidPartition(1) // topic has only one partition
   }
 
-  @Test
-  def testHandleTxnOffsetCommitRequest(): Unit = {
-    addTopicToMetadataCache("foo", numPartitions = 1)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+  def testHandleTxnOffsetCommitRequest(version: Short): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    addTopicToMetadataCache(topicName, topicId = topicId, numPartitions = 1)
 
     val txnOffsetCommitRequest = new TxnOffsetCommitRequestData()
       .setGroupId("group")
-      .setMemberId("member")
-      .setGenerationIdOrMemberEpoch(10)
+      .setMemberId(if (version >= 3) "member" else "")
+      .setGenerationIdOrMemberEpoch(if (version >= 3) 10 else -1)
       .setProducerId(20)
       .setProducerEpoch(30)
-      .setGroupInstanceId("instance-id")
+      .setGroupInstanceId(if (version >= 3) "instance-id" else null)
       .setTransactionalId("transactional-id")
       .setTopics(util.List.of(
         new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
-          .setName("foo")
+          .setTopicId(topicId)
+          .setName(topicName)
+          .setPartitions(util.List.of(
+            new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(10)))))
+
+    val expectedTxnOffsetCommitRequest = new TxnOffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId(if (version >= 3) "member" else "")
+      .setGenerationIdOrMemberEpoch(if (version >= 3) 10 else -1)
+      .setProducerId(20)
+      .setProducerEpoch(30)
+      .setGroupInstanceId(if (version >= 3) "instance-id" else null)
+      .setTransactionalId("transactional-id")
+      .setTopics(util.List.of(
+        new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+          .setTopicId(topicId)
+          .setName(topicName)
           .setPartitions(util.List.of(
             new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
               .setPartitionIndex(0)
               .setCommittedOffset(10)))))
 
-    val requestChannelRequest = 
buildRequest(TxnOffsetCommitRequest.Builder.forTopicNames(txnOffsetCommitRequest,
 true).build())
+    val requestChannelRequest = buildRequest(
+      
TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(txnOffsetCommitRequest, true, 
true).build(version)
+    )
 
     val future = new CompletableFuture[TxnOffsetCommitResponseData]()
     
when(txnCoordinator.partitionFor(txnOffsetCommitRequest.transactionalId)).thenReturn(0)
     when(groupCoordinator.commitTransactionalOffsets(
       requestChannelRequest.context,
-      txnOffsetCommitRequest,
+      expectedTxnOffsetCommitRequest,
       RequestLocal.noCaching.bufferSupplier
     )).thenReturn(future)
     kafkaApis = createKafkaApis()
@@ -1541,7 +1564,8 @@ class KafkaApisTest extends Logging {
     val txnOffsetCommitResponse = new TxnOffsetCommitResponseData()
       .setTopics(util.List.of(
         new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
-          .setName("foo")
+          .setTopicId(if (version >= 6) topicId else Uuid.ZERO_UUID)
+          .setName(if (version < 6) topicName else "")
           .setPartitions(util.List.of(
             new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
               .setPartitionIndex(0)
@@ -1552,28 +1576,46 @@ class KafkaApisTest extends Logging {
     assertEquals(txnOffsetCommitResponse, response.data)
   }
 
-  @Test
-  def testHandleTxnOffsetCommitRequestFutureFailed(): Unit = {
-    addTopicToMetadataCache("foo", numPartitions = 1)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+  def testHandleTxnOffsetCommitRequestFutureFailed(version: Short): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    addTopicToMetadataCache(topicName, topicId = topicId, numPartitions = 1)
 
     val txnOffsetCommitRequest = new TxnOffsetCommitRequestData()
       .setGroupId("group")
-      .setMemberId("member")
+      .setMemberId(if (version >= 3) "member" else "")
       .setTopics(util.List.of(
         new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
-          .setName("foo")
+          .setTopicId(topicId)
+          .setName(topicName)
           .setPartitions(util.List.of(
             new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
               .setPartitionIndex(0)
               .setCommittedOffset(10)))))
 
-    val requestChannelRequest = 
buildRequest(TxnOffsetCommitRequest.Builder.forTopicNames(txnOffsetCommitRequest,
 true).build())
+    val expectedTxnOffsetCommitRequest = new TxnOffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId(if (version >= 3) "member" else "")
+      .setTopics(util.List.of(
+        new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+          .setTopicId(topicId)
+          .setName(topicName)
+          .setPartitions(util.List.of(
+            new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(10)))))
+
+    val requestChannelRequest = buildRequest(
+      
TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(txnOffsetCommitRequest, true, 
true).build(version)
+    )
 
     val future = new CompletableFuture[TxnOffsetCommitResponseData]()
     
when(txnCoordinator.partitionFor(txnOffsetCommitRequest.transactionalId)).thenReturn(0)
     when(groupCoordinator.commitTransactionalOffsets(
       requestChannelRequest.context,
-      txnOffsetCommitRequest,
+      expectedTxnOffsetCommitRequest,
       RequestLocal.noCaching.bufferSupplier
     )).thenReturn(future)
     kafkaApis = createKafkaApis()
@@ -1585,7 +1627,8 @@ class KafkaApisTest extends Logging {
     val expectedTxnOffsetCommitResponse = new TxnOffsetCommitResponseData()
       .setTopics(util.List.of(
         new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
-          .setName("foo")
+          .setTopicId(if (version >= 6) topicId else Uuid.ZERO_UUID)
+          .setName(if (version < 6) topicName else "")
           .setPartitions(util.List.of(
             new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
               .setPartitionIndex(0)
@@ -1596,17 +1639,22 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedTxnOffsetCommitResponse, response.data)
   }
 
-  @Test
-  def testHandleTxnOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = {
-    addTopicToMetadataCache("foo", numPartitions = 2)
-    addTopicToMetadataCache("bar", numPartitions = 2)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+  def testHandleTxnOffsetCommitRequestTopicsAndPartitionsValidation(version: 
Short): Unit = {
+    val fooId = Uuid.randomUuid()
+    val barId = Uuid.randomUuid()
+    val zarId = Uuid.randomUuid()
+    addTopicToMetadataCache("foo", topicId = fooId, numPartitions = 2)
+    addTopicToMetadataCache("bar", topicId = barId, numPartitions = 2)
 
     val txnOffsetCommitRequest = new TxnOffsetCommitRequestData()
       .setGroupId("group")
-      .setMemberId("member")
+      .setMemberId(if (version >= 3) "member" else "")
       .setTopics(util.List.of(
         // foo exists but only has 2 partitions.
         new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+          .setTopicId(fooId)
           .setName("foo")
           .setPartitions(util.List.of(
             new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -1620,6 +1668,7 @@ class KafkaApisTest extends Logging {
               .setCommittedOffset(30))),
         // bar exists.
         new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+          .setTopicId(barId)
           .setName("bar")
           .setPartitions(util.List.of(
             new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -1630,6 +1679,7 @@ class KafkaApisTest extends Logging {
               .setCommittedOffset(50))),
         // zar does not exist.
         new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+          .setTopicId(zarId)
           .setName("zar")
           .setPartitions(util.List.of(
             new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -1639,15 +1689,18 @@ class KafkaApisTest extends Logging {
               .setPartitionIndex(1)
               .setCommittedOffset(70)))))
 
-    val requestChannelRequest = 
buildRequest(TxnOffsetCommitRequest.Builder.forTopicNames(txnOffsetCommitRequest,
 true).build())
+    val requestChannelRequest = buildRequest(
+      
TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(txnOffsetCommitRequest, true, 
true).build(version)
+    )
 
     // This is the request expected by the group coordinator.
     val expectedTxnOffsetCommitRequest = new TxnOffsetCommitRequestData()
       .setGroupId("group")
-      .setMemberId("member")
+      .setMemberId(if (version >= 3) "member" else "")
       .setTopics(util.List.of(
         // foo exists but only has 2 partitions.
         new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+          .setTopicId(fooId)
           .setName("foo")
           .setPartitions(util.List.of(
             new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -1657,6 +1710,7 @@ class KafkaApisTest extends Logging {
               .setPartitionIndex(1)
               .setCommittedOffset(20))),
         new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+          .setTopicId(barId)
           .setName("bar")
           .setPartitions(util.List.of(
             new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -1683,7 +1737,8 @@ class KafkaApisTest extends Logging {
     val txnOffsetCommitResponse = new TxnOffsetCommitResponseData()
       .setTopics(util.List.of(
         new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
-          .setName("foo")
+          .setTopicId(if (version >= 6) fooId else Uuid.ZERO_UUID)
+          .setName(if (version < 6) "foo" else "")
           .setPartitions(util.List.of(
             new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
               .setPartitionIndex(0)
@@ -1692,7 +1747,8 @@ class KafkaApisTest extends Logging {
               .setPartitionIndex(1)
               .setErrorCode(Errors.NONE.code))),
         new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
-          .setName("bar")
+          .setTopicId(if (version >= 6) barId else Uuid.ZERO_UUID)
+          .setName(if (version < 6) "bar" else "")
           .setPartitions(util.List.of(
             new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
               .setPartitionIndex(0)
@@ -1701,10 +1757,13 @@ class KafkaApisTest extends Logging {
               .setPartitionIndex(1)
               .setErrorCode(Errors.NONE.code)))))
 
+    // For v6+, the unknown topic returns UNKNOWN_TOPIC_ID; for v0-5 it returns
+    // UNKNOWN_TOPIC_OR_PARTITION.
     val expectedTxnOffsetCommitResponse = new TxnOffsetCommitResponseData()
       .setTopics(util.List.of(
         new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
-          .setName("foo")
+          .setTopicId(if (version >= 6) fooId else Uuid.ZERO_UUID)
+          .setName(if (version < 6) "foo" else "")
           .setPartitions(util.List.of(
             // foo-2 is first because partitions failing the validation
             // are put in the response first.
@@ -1720,16 +1779,18 @@ class KafkaApisTest extends Logging {
         // zar is before bar because topics failing the validation are
         // put in the response first.
         new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
-          .setName("zar")
+          .setTopicId(if (version >= 6) zarId else Uuid.ZERO_UUID)
+          .setName(if (version < 6) "zar" else "")
           .setPartitions(util.List.of(
             new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
               .setPartitionIndex(0)
-              .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
+              .setErrorCode(if (version >= 6) Errors.UNKNOWN_TOPIC_ID.code 
else Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
             new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
               .setPartitionIndex(1)
-              .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))),
+              .setErrorCode(if (version >= 6) Errors.UNKNOWN_TOPIC_ID.code 
else Errors.UNKNOWN_TOPIC_OR_PARTITION.code))),
         new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
-          .setName("bar")
+          .setTopicId(if (version >= 6) barId else Uuid.ZERO_UUID)
+          .setName(if (version < 6) "bar" else "")
           .setPartitions(util.List.of(
             new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
               .setPartitionIndex(0)
@@ -1747,25 +1808,46 @@ class KafkaApisTest extends Logging {
   @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT, 
enableUnstableLastVersion = false)
   def 
shouldReplaceCoordinatorNotAvailableWithLoadInProcessInTxnOffsetCommitWithOlderClient(version:
 Short): Unit = {
     val topic = "topic"
-    addTopicToMetadataCache(topic, numPartitions = 2)
+    val topicId = Uuid.randomUuid()
+    addTopicToMetadataCache(topic, topicId = topicId, numPartitions = 2)
 
     val topicPartition = new TopicPartition(topic, 1)
     val capturedResponse: ArgumentCaptor[TxnOffsetCommitResponse] = 
ArgumentCaptor.forClass(classOf[TxnOffsetCommitResponse])
 
-    val partitionOffsetCommitData = new 
TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty())
     val groupId = "groupId"
-
     val producerId = 15L
     val epoch = 0.toShort
 
-    val data = new TxnOffsetCommitRequestData()
+    val offsetCommitRequestData = new TxnOffsetCommitRequestData()
+      .setTransactionalId("txnId")
+      .setGroupId(groupId)
+      .setProducerId(producerId)
+      .setProducerEpoch(epoch)
+      .setTopics(util.List.of(
+        new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+          .setTopicId(if (version >= 6) topicId else Uuid.ZERO_UUID)
+          .setName(if (version < 6) topic else "")
+          .setPartitions(util.List.of(
+            new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+              .setPartitionIndex(topicPartition.partition)
+              .setCommittedOffset(15L)))))
+
+    val expectedOffsetCommitRequestData = new TxnOffsetCommitRequestData()
       .setTransactionalId("txnId")
       .setGroupId(groupId)
       .setProducerId(producerId)
       .setProducerEpoch(epoch)
-      .setTopics(TxnOffsetCommitRequest.getTopics(util.Map.of(topicPartition, 
partitionOffsetCommitData)))
+      .setTopics(util.List.of(
+        new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+          .setTopicId(topicId)
+          .setName(topic)
+          .setPartitions(util.List.of(
+            new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+              .setPartitionIndex(topicPartition.partition)
+              .setCommittedOffset(15L)))))
+
     val offsetCommitRequest = TxnOffsetCommitRequest.Builder.forTopicNames(
-      data,
+      offsetCommitRequestData,
       version >= 
TxnOffsetCommitRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
     ).build(version)
     val request = buildRequest(offsetCommitRequest)
@@ -1775,14 +1857,15 @@ class KafkaApisTest extends Logging {
     
when(txnCoordinator.partitionFor(offsetCommitRequest.data.transactionalId)).thenReturn(0)
     when(groupCoordinator.commitTransactionalOffsets(
       request.context,
-      offsetCommitRequest.data,
+      expectedOffsetCommitRequestData,
       requestLocal.bufferSupplier
     )).thenReturn(future)
 
     future.complete(new TxnOffsetCommitResponseData()
       .setTopics(util.List.of(
         new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
-          .setName(topicPartition.topic)
+          .setTopicId(if (version >= 6) topicId else Uuid.ZERO_UUID)
+          .setName(if (version < 6) topicPartition.topic else "")
           .setPartitions(util.List.of(
             new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
               .setPartitionIndex(topicPartition.partition)
@@ -1799,10 +1882,11 @@ class KafkaApisTest extends Logging {
     )
     val response = capturedResponse.getValue
 
+    val partitionError = 
Errors.forCode(response.data.topics.get(0).partitions.get(0).errorCode)
     if (version < 2) {
-      assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, 
response.errors().get(topicPartition))
+      assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, partitionError)
     } else {
-      assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, 
response.errors().get(topicPartition))
+      assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, partitionError)
     }
   }
 

Reply via email to