This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 75620447819 KAFKA-20444: [1/N] Add TxnOffsetCommit v6 schema 
(KIP-1319) (#22205)
75620447819 is described below

commit 756204478193b7aa5838bb5aaf049277b8798999
Author: David Jacot <[email protected]>
AuthorDate: Tue May 5 20:57:50 2026 +0200

    KAFKA-20444: [1/N] Add TxnOffsetCommit v6 schema (KIP-1319) (#22205)
    
    This patch introduces version 6 of the TxnOffsetCommit API:
    
    * Adds a new `TopicId` field at `6+` and bounds the existing `Name`
    field to `0-5` on both the request and the response.
    * Renames the `GenerationId` field to `GenerationIdOrMemberEpoch` on the
    request to reflect its dual semantics under the new consumer group
    protocol (source-only rename; the wire format is positional and
    unchanged).
    * Ensures that the client cannot use version 6 yet.
    * Keeps the new version as unstable.
    
    Reviewers: Sean Quah <[email protected]>
---
 .../producer/internals/TransactionManager.java     |   2 +-
 .../common/requests/TxnOffsetCommitRequest.java    |  15 ++-
 .../common/message/TxnOffsetCommitRequest.json     |  13 ++-
 .../common/message/TxnOffsetCommitResponse.json    |  30 +++++-
 .../kafka/clients/producer/KafkaProducerTest.java  |   2 +-
 .../producer/internals/TransactionManagerTest.java |   4 +-
 .../apache/kafka/common/message/MessageTest.java   | 120 +++++++--------------
 .../kafka/common/requests/RequestResponseTest.java |   4 +-
 .../requests/TxnOffsetCommitRequestTest.java       |   2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   2 +-
 .../server/GroupCoordinatorBaseRequestTest.scala   |   2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   4 +-
 .../kafka/server/TxnOffsetCommitRequestTest.scala  |   4 +-
 .../kafka/server/WriteTxnMarkersRequestTest.scala  |   2 +-
 .../coordinator/group/OffsetMetadataManager.java   |   4 +-
 .../group/GroupCoordinatorServiceTest.java         |   8 +-
 .../group/OffsetMetadataManagerTest.java           |  22 ++--
 17 files changed, 116 insertions(+), 124 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index be4b1f8dfbc..50e35d32096 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1255,7 +1255,7 @@ public class TransactionManager {
             .setProducerId(producerIdAndEpoch.producerId)
             .setProducerEpoch(producerIdAndEpoch.epoch)
             .setMemberId(groupMetadata.memberId())
-            .setGenerationId(groupMetadata.generationId())
+            .setGenerationIdOrMemberEpoch(groupMetadata.generationId())
             .setGroupInstanceId(groupMetadata.groupInstanceId().orElse(null))
             
.setTopics(TxnOffsetCommitRequest.getTopics(pendingTxnOffsetCommits));
         final TxnOffsetCommitRequest.Builder builder =
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index c94e4332537..30c40c29dab 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -49,9 +49,11 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
 
         private Builder(
             final TxnOffsetCommitRequestData data,
-            final boolean isTransactionV2Enabled
+            final boolean isTransactionV2Enabled,
+            final short oldestAllowedVersion,
+            final short latestAllowedVersion
         ) {
-            super(ApiKeys.TXN_OFFSET_COMMIT);
+            super(ApiKeys.TXN_OFFSET_COMMIT, oldestAllowedVersion, 
latestAllowedVersion);
             this.data = data;
             this.isTransactionV2Enabled = isTransactionV2Enabled;
         }
@@ -60,7 +62,12 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
             final TxnOffsetCommitRequestData data,
             final boolean isTransactionV2Enabled
         ) {
-            return new Builder(data, isTransactionV2Enabled);
+            return new Builder(
+                data,
+                isTransactionV2Enabled,
+                ApiKeys.TXN_OFFSET_COMMIT.oldestVersion(),
+                (short) 5
+            );
         }
 
         @Override
@@ -77,7 +84,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
 
         private boolean groupMetadataSet() {
             return !data.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID) 
||
-                       data.generationId() != 
JoinGroupRequest.UNKNOWN_GENERATION_ID ||
+                       data.generationIdOrMemberEpoch() != 
JoinGroupRequest.UNKNOWN_GENERATION_ID ||
                        data.groupInstanceId() != null;
         }
 
diff --git 
a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json 
b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
index 59a1f05e097..d2b91b3d930 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
@@ -30,7 +30,10 @@
   // transaction V2 (KIP_890 part 2) is enabled, the TxnOffsetCommit request 
will also include the function for a
   // AddOffsetsToTxn call. If V2 is disabled, the client can't use 
TxnOffsetCommit request version higher than 4 within
   // a transaction.
-  "validVersions": "0-5",
+  //
+  // Version 6 adds support for topic IDs and removes support for topic names 
(KIP-1319).
+  "validVersions": "0-6",
+  "latestVersionUnstable": true,
   "flexibleVersions": "3+",
   "fields": [
     { "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
@@ -41,8 +44,8 @@
       "about": "The current producer ID in use by the transactional ID." },
     { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
       "about": "The current epoch associated with the producer ID." },
-    { "name": "GenerationId", "type": "int32", "versions": "3+", "default": 
"-1",
-      "about": "The generation of the consumer." },
+    { "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "3+", 
"default": "-1",
+      "about": "The generation of the group if using the classic group 
protocol or the member epoch if using the consumer protocol." },
     { "name": "MemberId", "type": "string", "versions": "3+", "default": "",
       "about": "The member ID assigned by the group coordinator." },
     { "name": "GroupInstanceId", "type": "string", "versions": "3+",
@@ -50,8 +53,10 @@
       "about": "The unique identifier of the consumer instance provided by end 
user." },
     { "name": "Topics", "type" : "[]TxnOffsetCommitRequestTopic", "versions": 
"0+",
       "about": "Each topic that we want to commit offsets for.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Name", "type": "string", "versions": "0-5", "entityType": 
"topicName", "ignorable": true,
         "about": "The topic name." },
+      { "name": "TopicId", "type": "uuid", "versions": "6+", "ignorable": true,
+        "about": "The topic ID." },
       { "name": "Partitions", "type": "[]TxnOffsetCommitRequestPartition", 
"versions": "0+",
         "about": "The partitions inside the topic that we want to commit 
offsets for.", "fields": [
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
diff --git 
a/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json 
b/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json
index 9769ed2aa97..58b5c3ebbdc 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json
@@ -26,15 +26,41 @@
   // Version 4 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
   //
   // Version 5 is the same with version 3 (KIP-890).
-  "validVersions": "0-5",
+  //
+  // Version 6 adds support for topic IDs and removes support for topic names. 
It can also return
+  // GROUP_ID_NOT_FOUND, STALE_MEMBER_EPOCH and UNKNOWN_TOPIC_ID (KIP-1319).
+  "validVersions": "0-6",
   "flexibleVersions": "3+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED (version 0+)
+  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
+  // - NOT_COORDINATOR (version 0+)
+  // - COORDINATOR_NOT_AVAILABLE (version 0+)
+  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
+  // - TRANSACTIONAL_ID_AUTHORIZATION_FAILED (version 0+)
+  // - INVALID_PRODUCER_ID_MAPPING (version 0+)
+  // - INVALID_PRODUCER_EPOCH (version 0+)
+  // - PRODUCER_FENCED (version 0+)
+  // - INVALID_TXN_STATE (version 0+)
+  // - UNSUPPORTED_FOR_MESSAGE_FORMAT (version 0+)
+  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
+  // - OFFSET_METADATA_TOO_LARGE (version 0+)
+  // - ILLEGAL_GENERATION (version 3+)
+  // - UNKNOWN_MEMBER_ID (version 3+)
+  // - FENCED_INSTANCE_ID (version 3+)
+  // - TRANSACTION_ABORTABLE (version 4+)
+  // - GROUP_ID_NOT_FOUND (version 6+)
+  // - STALE_MEMBER_EPOCH (version 6+)
+  // - UNKNOWN_TOPIC_ID (version 6+)
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
     { "name": "Topics", "type": "[]TxnOffsetCommitResponseTopic", "versions": 
"0+",
       "about": "The responses for each topic.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Name", "type": "string", "versions": "0-5", "entityType": 
"topicName", "ignorable": true,
         "about": "The topic name." },
+      { "name": "TopicId", "type": "uuid", "versions": "6+", "ignorable": true,
+        "about": "The topic ID." },
       { "name": "Partitions", "type": "[]TxnOffsetCommitResponsePartition", 
"versions": "0+",
         "about": "The responses for each partition in the topic.", "fields": [
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 60dd4069829..c1354460710 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -2227,7 +2227,7 @@ public class KafkaProducerTest {
             TxnOffsetCommitRequestData data = ((TxnOffsetCommitRequest) 
request).data();
             return data.groupId().equals(groupId) &&
                 data.memberId().equals(memberId) &&
-                data.generationId() == generationId &&
+                data.generationIdOrMemberEpoch() == generationId &&
                 data.groupInstanceId().equals(groupInstanceId);
         }, txnOffsetsCommitResponse(Collections.singletonMap(
             new TopicPartition("topic", 0), Errors.NONE)));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 9efed2f3c2d..798dfe54fb2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -1249,7 +1249,7 @@ public class TransactionManagerTest {
             assertEquals(consumerGroupId, 
txnOffsetCommitRequest.data().groupId());
             assertEquals(producerId, 
txnOffsetCommitRequest.data().producerId());
             assertEquals(epoch, txnOffsetCommitRequest.data().producerEpoch());
-            return txnOffsetCommitRequest.data().generationId() != 
generationId;
+            return txnOffsetCommitRequest.data().generationIdOrMemberEpoch() 
!= generationId;
         }, new TxnOffsetCommitResponse(0, singletonMap(tp, 
Errors.ILLEGAL_GENERATION)));
 
         runUntil(transactionManager::hasError);
@@ -4475,7 +4475,7 @@ public class TransactionManagerTest {
             assertEquals(producerEpoch, 
txnOffsetCommitRequest.data().producerEpoch());
             assertEquals(groupInstanceId, 
txnOffsetCommitRequest.data().groupInstanceId());
             assertEquals(memberId, txnOffsetCommitRequest.data().memberId());
-            assertEquals(generationId, 
txnOffsetCommitRequest.data().generationId());
+            assertEquals(generationId, 
txnOffsetCommitRequest.data().generationIdOrMemberEpoch());
             return true;
         }, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java 
b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 8df678ba3b3..c339db8f35e 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -455,92 +455,46 @@ public final class MessageTest {
         testMessageRoundTrip(version, response, response);
     }
 
-    @Test
-    public void testTxnOffsetCommitRequestVersions() throws Exception {
-        String groupId = "groupId";
-        String topicName = "topic";
-        String metadata = "metadata";
-        String txnId = "transactionalId";
-        int producerId = 25;
-        short producerEpoch = 10;
-        String instanceId = "instance";
-        String memberId = "member";
-        int generationId = 1;
-
-        int partition = 2;
-        int offset = 100;
-
-        testAllMessageRoundTrips(new TxnOffsetCommitRequestData()
-                                     .setGroupId(groupId)
-                                     .setTransactionalId(txnId)
-                                     .setProducerId(producerId)
-                                     .setProducerEpoch(producerEpoch)
-                                     .setTopics(Collections.singletonList(
-                                         new TxnOffsetCommitRequestTopic()
-                                             .setName(topicName)
-                                             
.setPartitions(Collections.singletonList(
-                                                 new 
TxnOffsetCommitRequestPartition()
-                                                     
.setPartitionIndex(partition)
-                                                     
.setCommittedMetadata(metadata)
-                                                     
.setCommittedOffset(offset)
-                                             )))));
-
-        Supplier<TxnOffsetCommitRequestData> request =
-            () -> new TxnOffsetCommitRequestData()
-                      .setGroupId(groupId)
-                      .setTransactionalId(txnId)
-                      .setProducerId(producerId)
-                      .setProducerEpoch(producerEpoch)
-                      .setGroupInstanceId(instanceId)
-                      .setMemberId(memberId)
-                      .setGenerationId(generationId)
-                      .setTopics(Collections.singletonList(
-                          new TxnOffsetCommitRequestTopic()
-                              .setName(topicName)
-                              .setPartitions(Collections.singletonList(
-                                  new TxnOffsetCommitRequestPartition()
-                                      .setPartitionIndex(partition)
-                                      .setCommittedLeaderEpoch(10)
-                                      .setCommittedMetadata(metadata)
-                                      .setCommittedOffset(offset)
-                              ))));
-
-        for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
-            TxnOffsetCommitRequestData requestData = request.get();
-            if (version < 2) {
-                
requestData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1);
-            }
-
-            if (version < 3) {
-                final short finalVersion = version;
-                assertThrows(UnsupportedVersionException.class, () -> 
testEquivalentMessageRoundTrip(finalVersion, requestData));
-                requestData.setGroupInstanceId(null);
-                assertThrows(UnsupportedVersionException.class, () -> 
testEquivalentMessageRoundTrip(finalVersion, requestData));
-                requestData.setMemberId("");
-                assertThrows(UnsupportedVersionException.class, () -> 
testEquivalentMessageRoundTrip(finalVersion, requestData));
-                requestData.setGenerationId(-1);
-            }
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+    public void testTxnOffsetCommitRequestVersions(short version) throws 
Exception {
+        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+            .setGroupId("groupId")
+            .setTransactionalId("transactionalId")
+            .setProducerId(25)
+            .setProducerEpoch((short) 10)
+            .setMemberId(version >= 3 ? "member" : "")
+            .setGenerationIdOrMemberEpoch(version >= 3 ? 1 : -1)
+            .setGroupInstanceId(version >= 3 ? "instance" : null)
+            .setTopics(singletonList(
+                new TxnOffsetCommitRequestTopic()
+                    .setTopicId(version >= 6 ? Uuid.randomUuid() : 
Uuid.ZERO_UUID)
+                    .setName(version < 6 ? "topic" : "")
+                    .setPartitions(singletonList(
+                        new TxnOffsetCommitRequestPartition()
+                            .setPartitionIndex(2)
+                            .setCommittedLeaderEpoch(version >= 2 ? 10 : -1)
+                            .setCommittedMetadata("metadata")
+                            .setCommittedOffset(100)))));
 
-            testAllMessageRoundTripsFromVersion(version, requestData);
-        }
+        testMessageRoundTrip(version, request, request);
     }
 
-    @Test
-    public void testTxnOffsetCommitResponseVersions() throws Exception {
-        testAllMessageRoundTrips(
-            new TxnOffsetCommitResponseData()
-                .setTopics(
-                   singletonList(
-                       new TxnOffsetCommitResponseTopic()
-                           .setName("topic")
-                           .setPartitions(singletonList(
-                               new TxnOffsetCommitResponsePartition()
-                                   .setPartitionIndex(1)
-                                   
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
-                           ))
-                   )
-               )
-               .setThrottleTimeMs(20));
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+    public void testTxnOffsetCommitResponseVersions(short version) throws 
Exception {
+        TxnOffsetCommitResponseData response = new 
TxnOffsetCommitResponseData()
+            .setThrottleTimeMs(20)
+            .setTopics(singletonList(
+                new TxnOffsetCommitResponseTopic()
+                    .setTopicId(version >= 6 ? Uuid.randomUuid() : 
Uuid.ZERO_UUID)
+                    .setName(version < 6 ? "topic" : "")
+                    .setPartitions(singletonList(
+                        new TxnOffsetCommitResponsePartition()
+                            .setPartitionIndex(1)
+                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())))));
+
+        testMessageRoundTrip(version, response, response);
     }
 
     @ParameterizedTest
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index f9f938847ea..f2f7cf33560 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2851,7 +2851,7 @@ public class RequestResponseTest {
 
         if (version >= 3) {
             data.setMemberId("member")
-                .setGenerationId(2)
+                .setGenerationIdOrMemberEpoch(2)
                 .setGroupInstanceId("instance");
         }
 
@@ -2871,7 +2871,7 @@ public class RequestResponseTest {
             .setProducerId(21L)
             .setProducerEpoch((short) 42)
             .setMemberId("member")
-            .setGenerationId(2)
+            .setGenerationIdOrMemberEpoch(2)
             .setGroupInstanceId("instance")
             .setTopics(TxnOffsetCommitRequest.getTopics(offsets));
         return TxnOffsetCommitRequest.Builder.forTopicNames(data, 
false).build();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
index b1143de96a3..6c37c59ede1 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
@@ -79,7 +79,7 @@ public class TxnOffsetCommitRequestTest extends 
OffsetCommitRequestTest {
             .setProducerId(producerId)
             .setProducerEpoch(producerEpoch)
             .setMemberId(memberId)
-            .setGenerationId(generationId)
+            .setGenerationIdOrMemberEpoch(generationId)
             .setGroupInstanceId(groupInstanceId)
             .setTopics(TxnOffsetCommitRequest.getTopics(OFFSETS));
         builderWithGroupMetadata = 
TxnOffsetCommitRequest.Builder.forTopicNames(dataWithGroupMetadata, true);
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c405b35705f..d38c8921dbd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2095,7 +2095,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         val txnOffsetCommitRequestData = new TxnOffsetCommitRequestData()
           .setGroupId(txnOffsetCommitRequest.data.groupId)
           .setMemberId(txnOffsetCommitRequest.data.memberId)
-          .setGenerationId(txnOffsetCommitRequest.data.generationId)
+          
.setGenerationIdOrMemberEpoch(txnOffsetCommitRequest.data.generationIdOrMemberEpoch)
           .setGroupInstanceId(txnOffsetCommitRequest.data.groupInstanceId)
           .setProducerEpoch(txnOffsetCommitRequest.data.producerEpoch)
           .setProducerId(txnOffsetCommitRequest.data.producerId)
diff --git 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index cd9afcc889b..6b041a329d0 100644
--- 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -267,7 +267,7 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
       new TxnOffsetCommitRequestData()
         .setGroupId(groupId)
         .setMemberId(memberId)
-        .setGenerationId(generationId)
+        .setGenerationIdOrMemberEpoch(generationId)
         .setProducerId(producerId)
         .setProducerEpoch(producerEpoch)
         .setTransactionalId(transactionalId)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 798d43892b1..f38a6146244 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1509,7 +1509,7 @@ class KafkaApisTest extends Logging {
     val txnOffsetCommitRequest = new TxnOffsetCommitRequestData()
       .setGroupId("group")
       .setMemberId("member")
-      .setGenerationId(10)
+      .setGenerationIdOrMemberEpoch(10)
       .setProducerId(20)
       .setProducerEpoch(30)
       .setGroupInstanceId("instance-id")
@@ -1744,7 +1744,7 @@ class KafkaApisTest extends Logging {
   }
 
   @ParameterizedTest
-  @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+  @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT, 
enableUnstableLastVersion = false)
   def 
shouldReplaceCoordinatorNotAvailableWithLoadInProcessInTxnOffsetCommitWithOlderClient(version:
 Short): Unit = {
     val topic = "topic"
     addTopicToMetadataCache(topic, numPartitions = 2)
diff --git 
a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
index cc6cd2c957e..61ed91a4e7d 100644
--- a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
@@ -77,7 +77,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
 
     createTopic(topic, 1)
 
-    for (version <- 0 to 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
+    for (version <- 0 to ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false)) {
       // Verify that the TXN_OFFSET_COMMIT request is processed correctly when 
member id is UNKNOWN_MEMBER_ID
       // and generation id is UNKNOWN_GENERATION_ID under all api versions.
       verifyTxnCommitAndFetch(
@@ -241,7 +241,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
 
     createTopic(topic, 1)
 
-    for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
+    for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false)) {
       val useTV2 = version > 
EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
 
       // Initialize producer. Wait until the coordinator finishes loading.
diff --git 
a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
index a476271c04c..82263b89181 100644
--- a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
@@ -66,7 +66,7 @@ class WriteTxnMarkersRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
 
     createTopic(topic, 1)
 
-    for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
+    for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false)) {
       val useTV2 = version > 
EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
 
       // Initialize producer. Wait until the coordinator finishes loading.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 1aeedd4c94f..73fbc151243 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -522,7 +522,7 @@ public class OffsetMetadataManager {
         try {
             group = groupMetadataManager.group(request.groupId());
         } catch (GroupIdNotFoundException ex) {
-            if (request.generationId() < 0) {
+            if (request.generationIdOrMemberEpoch() < 0) {
                 // If the group does not exist and generation id is -1, the 
request comes from
                 // either the admin client or a consumer which does not use 
the group management
                 // facility. In this case, a so-called simple group is created 
and the request
@@ -537,7 +537,7 @@ public class OffsetMetadataManager {
             return group.validateOffsetCommit(
                 request.memberId(),
                 request.groupInstanceId(),
-                request.generationId(),
+                request.generationIdOrMemberEpoch(),
                 true,
                 context.requestVersion()
             );
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index e375872a125..c5ea4301e8b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -2876,7 +2876,7 @@ public class GroupCoordinatorServiceTest {
             .setGroupId("foo")
             .setTransactionalId("transactional-id")
             .setMemberId("member-id")
-            .setGenerationId(10)
+            .setGenerationIdOrMemberEpoch(10)
             .setTopics(List.of(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                 .setName(TOPIC_NAME)
                 .setPartitions(List.of(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -2914,7 +2914,7 @@ public class GroupCoordinatorServiceTest {
             .setGroupId(groupId)
             .setTransactionalId("transactional-id")
             .setMemberId("member-id")
-            .setGenerationId(10)
+            .setGenerationIdOrMemberEpoch(10)
             .setTopics(List.of(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                 .setName(TOPIC_NAME)
                 .setPartitions(List.of(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -2953,7 +2953,7 @@ public class GroupCoordinatorServiceTest {
             .setProducerId(10L)
             .setProducerEpoch((short) 5)
             .setMemberId("member-id")
-            .setGenerationId(10)
+            .setGenerationIdOrMemberEpoch(10)
             .setTopics(List.of(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                 .setName(TOPIC_NAME)
                 .setPartitions(List.of(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -3007,7 +3007,7 @@ public class GroupCoordinatorServiceTest {
             .setProducerId(10L)
             .setProducerEpoch((short) 5)
             .setMemberId("member-id")
-            .setGenerationId(10)
+            .setGenerationIdOrMemberEpoch(10)
             .setTopics(List.of(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                 .setName(TOPIC_NAME)
                 .setPartitions(List.of(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 69e0617350d..adb17e4b17a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -1695,11 +1695,11 @@ public class OffsetMetadataManagerTest {
             ));
 
         // When client epoch (3) < assignment epoch (5), exception should be 
thrown.
-        request.setGenerationId(3);
+        request.setGenerationIdOrMemberEpoch(3);
         assertThrows(IllegalGenerationException.class, () -> 
context.commitTransactionalOffset(request));
 
         // When client epoch (5) >= assignment epoch (5), commit should 
succeed.
-        request.setGenerationId(5);
+        request.setGenerationIdOrMemberEpoch(5);
         assertDoesNotThrow(() -> context.commitTransactionalOffset(request));
 
         CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord> 
result = context.commitTransactionalOffset(request);
@@ -1742,7 +1742,7 @@ public class OffsetMetadataManagerTest {
             new TxnOffsetCommitRequestData()
                 .setGroupId("foo")
                 .setMemberId("member")
-                .setGenerationId(10)
+                .setGenerationIdOrMemberEpoch(10)
                 .setTopics(List.of(
                     new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                         .setName("bar")
@@ -1796,7 +1796,7 @@ public class OffsetMetadataManagerTest {
             new TxnOffsetCommitRequestData()
                 .setGroupId("foo")
                 .setMemberId("member")
-                .setGenerationId(10)
+                .setGenerationIdOrMemberEpoch(10)
                 .setTopics(List.of(
                     new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                         .setName("bar")
@@ -1842,7 +1842,7 @@ public class OffsetMetadataManagerTest {
             new TxnOffsetCommitRequestData()
                 .setGroupId("foo")
                 .setMemberId("member")
-                .setGenerationId(10)
+                .setGenerationIdOrMemberEpoch(10)
                 .setTopics(List.of(
                     new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                         .setName("bar")
@@ -1901,7 +1901,7 @@ public class OffsetMetadataManagerTest {
             new TxnOffsetCommitRequestData()
                 .setGroupId("foo")
                 .setMemberId("member")
-                .setGenerationId(7)
+                .setGenerationIdOrMemberEpoch(7)
                 .setTopics(List.of(
                     new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                         .setName("bar")
@@ -1938,7 +1938,7 @@ public class OffsetMetadataManagerTest {
             new TxnOffsetCommitRequestData()
                 .setGroupId("foo")
                 .setMemberId("member")
-                .setGenerationId(100)
+                .setGenerationIdOrMemberEpoch(100)
                 .setTopics(List.of(
                     new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                         .setName("bar")
@@ -1977,7 +1977,7 @@ public class OffsetMetadataManagerTest {
             new TxnOffsetCommitRequestData()
                 .setGroupId("foo")
                 .setMemberId("member")
-                .setGenerationId(1)
+                .setGenerationIdOrMemberEpoch(1)
                 .setTopics(List.of(
                     new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                         .setName("bar")
@@ -2031,7 +2031,7 @@ public class OffsetMetadataManagerTest {
             new TxnOffsetCommitRequestData()
                 .setGroupId("foo")
                 .setMemberId("member")
-                .setGenerationId(10)
+                .setGenerationIdOrMemberEpoch(10)
                 .setTopics(List.of(
                     new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                         .setName("bar")
@@ -2060,7 +2060,7 @@ public class OffsetMetadataManagerTest {
             new TxnOffsetCommitRequestData()
                 .setGroupId("foo")
                 .setMemberId("member")
-                .setGenerationId(10)
+                .setGenerationIdOrMemberEpoch(10)
                 .setTopics(List.of(
                     new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                         .setName("bar")
@@ -2099,7 +2099,7 @@ public class OffsetMetadataManagerTest {
             new TxnOffsetCommitRequestData()
                 .setGroupId("foo")
                 .setMemberId("member")
-                .setGenerationId(100)
+                .setGenerationIdOrMemberEpoch(100)
                 .setTopics(List.of(
                     new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                         .setName("bar")


Reply via email to