This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 62db165d2af KAFKA-20444: [8/N] Extend TxnOffsetCommit integration 
tests to v6 (KIP-1319) (#22255)
62db165d2af is described below

commit 62db165d2af99c489010688fcaa4addf4c398964
Author: David Jacot <[email protected]>
AuthorDate: Tue May 12 09:25:32 2026 +0200

    KAFKA-20444: [8/N] Extend TxnOffsetCommit integration tests to v6 
(KIP-1319) (#22255)
    
    Extend the existing `TxnOffsetCommit` and `WriteTxnMarkers` integration
    tests to cover v6 (KIP-1319), and thread the topic id through the
    `commitTxnOffset` helper in `GroupCoordinatorBaseRequestTest`.
    
    Three new error cases in
    `TxnOffsetCommitRequestTest.testTxnOffsetCommit`:
    - v6+ stale member epoch (new protocol) → `STALE_MEMBER_EPOCH`.
    - v6+ unknown group id → `GROUP_ID_NOT_FOUND`.
    - v6+ unknown topic id → `UNKNOWN_TOPIC_ID`.
    
    Reviewers: Sean Quah <[email protected]>
---
 .../server/GroupCoordinatorBaseRequestTest.scala   |  5 +-
 .../kafka/server/TxnOffsetCommitRequestTest.scala  | 58 ++++++++++++++++++++--
 .../kafka/server/WriteTxnMarkersRequestTest.scala  |  5 +-
 3 files changed, 60 insertions(+), 8 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 6b041a329d0..a2456a0aa9a 100644
--- 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -258,6 +258,7 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
      producerEpoch: Short,
      transactionalId: String,
      topic: String,
+     topicId: Uuid,
      partition: Int,
      offset: Long,
      expectedError: Errors,
@@ -273,6 +274,7 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
         .setTransactionalId(transactionalId)
         .setTopics(List(
           new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+            .setTopicId(topicId)
             .setName(topic)
             .setPartitions(List(
               new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -286,7 +288,8 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     val expectedResponse = new TxnOffsetCommitResponseData()
       .setTopics(List(
         new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
-          .setName(topic)
+          .setTopicId(if (version >= 6) topicId else Uuid.ZERO_UUID)
+          .setName(if (version < 6) topic else "")
           .setPartitions(List(
             new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
               .setPartitionIndex(partition)
diff --git 
a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
index 61ed91a4e7d..ee774c05c8b 100644
--- a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
@@ -17,6 +17,7 @@
 package kafka.server
 
 import kafka.utils.TestUtils
+import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{EndTxnRequest, JoinGroupRequest}
@@ -75,13 +76,14 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
     assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
     assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
 
-    createTopic(topic, 1)
+    val topicId = createTopic(topic, 1)
 
-    for (version <- 0 to ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false)) {
+    for (version <- 0 to 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
       // Verify that the TXN_OFFSET_COMMIT request is processed correctly when 
member id is UNKNOWN_MEMBER_ID
       // and generation id is UNKNOWN_GENERATION_ID under all api versions.
       verifyTxnCommitAndFetch(
         topic = topic,
+        topicId = topicId,
         partition = partition,
         transactionalId = transactionalId,
         groupId = groupId,
@@ -98,6 +100,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
         // must not be empty from version 3 onwards.
         verifyTxnCommitAndFetch(
           topic = topic,
+          topicId = topicId,
           partition = partition,
           transactionalId = transactionalId,
           groupId = groupId,
@@ -111,6 +114,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
         // Verify TXN_OFFSET_COMMIT request failed with incorrect memberId.
         verifyTxnCommitAndFetch(
           topic = topic,
+          topicId = topicId,
           partition = partition,
           transactionalId = transactionalId,
           groupId = groupId,
@@ -122,8 +126,11 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
         )
 
         // Verify TXN_OFFSET_COMMIT request failed with incorrect generationId.
+        // Under the new consumer group protocol, v6+ returns 
STALE_MEMBER_EPOCH
+        // directly while v0-5 maps it to ILLEGAL_GENERATION (KIP-1319).
         verifyTxnCommitAndFetch(
           topic = topic,
+          topicId = topicId,
           partition = partition,
           transactionalId = transactionalId,
           groupId = groupId,
@@ -131,13 +138,51 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
           generationId = 100,
           offset = 200 + version,
           version = version.toShort,
-          expectedTxnCommitError = Errors.ILLEGAL_GENERATION
+          expectedTxnCommitError =
+            if (useNewProtocol && version >= 6) Errors.STALE_MEMBER_EPOCH
+            else Errors.ILLEGAL_GENERATION
         )
+
+        // Verify TXN_OFFSET_COMMIT request failed with an unknown groupId.
+        // v6+ propagates GROUP_ID_NOT_FOUND directly while v0-5 maps it to
+        // ILLEGAL_GENERATION (KIP-1319). This applies to both protocols.
+        verifyTxnCommitAndFetch(
+          topic = topic,
+          topicId = topicId,
+          partition = partition,
+          transactionalId = transactionalId,
+          groupId = "unknown",
+          memberId = memberId,
+          generationId = memberEpoch,
+          offset = 200 + version,
+          version = version.toShort,
+          expectedTxnCommitError =
+            if (version >= 6) Errors.GROUP_ID_NOT_FOUND
+            else Errors.ILLEGAL_GENERATION
+        )
+
+        if (version >= 6) {
+          // Verify TXN_OFFSET_COMMIT request failed with UNKNOWN_TOPIC_ID for 
an
+          // unknown topic id. Only v6+ carries topic IDs on the wire.
+          verifyTxnCommitAndFetch(
+            topic = topic,
+            topicId = Uuid.randomUuid(),
+            partition = partition,
+            transactionalId = transactionalId,
+            groupId = groupId,
+            memberId = memberId,
+            generationId = memberEpoch,
+            offset = 200 + version,
+            version = version.toShort,
+            expectedTxnCommitError = Errors.UNKNOWN_TOPIC_ID
+          )
+        }
       } else {
         // Verify that the TXN_OFFSET_COMMIT request failed when group 
metadata is set under version 3.
         assertThrows(classOf[UnsupportedVersionException], () =>
           verifyTxnCommitAndFetch(
             topic = topic,
+            topicId = topicId,
             partition = partition,
             transactionalId = transactionalId,
             groupId = groupId,
@@ -154,6 +199,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
 
   private def verifyTxnCommitAndFetch(
     topic: String,
+    topicId: Uuid,
     partition: Int,
     transactionalId: String,
     groupId: String,
@@ -195,6 +241,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
       producerEpoch = producerIdAndEpoch.epoch,
       transactionalId = transactionalId,
       topic = topic,
+      topicId = topicId,
       partition = partition,
       offset = offset,
       expectedError = expectedTxnCommitError,
@@ -239,9 +286,9 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
     assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
     assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
 
-    createTopic(topic, 1)
+    val topicId = createTopic(topic, 1)
 
-    for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false)) {
+    for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
       val useTV2 = version > 
EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
 
       // Initialize producer. Wait until the coordinator finishes loading.
@@ -300,6 +347,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
         producerEpoch = producerIdAndEpoch.epoch,
         transactionalId = transactionalId,
         topic = topic,
+        topicId = topicId,
         partition = partition,
         offset = offset,
         expectedError = if (useTV2) Errors.INVALID_PRODUCER_EPOCH else 
Errors.NONE,
diff --git 
a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
index 82263b89181..1bc57a2ff65 100644
--- a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
@@ -64,9 +64,9 @@ class WriteTxnMarkersRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
     assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
     assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
 
-    createTopic(topic, 1)
+    val topicId = createTopic(topic, 1)
 
-    for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false)) {
+    for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
       val useTV2 = version > 
EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
 
       // Initialize producer. Wait until the coordinator finishes loading.
@@ -124,6 +124,7 @@ class WriteTxnMarkersRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
         producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort 
else producerIdAndEpoch.epoch,
         transactionalId = transactionalId,
         topic = topic,
+        topicId = topicId,
         partition = partition,
         offset = offset + version,
         expectedError = Errors.NONE,

Reply via email to