This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2342c80dca4 KAFKA-20444: [3/N] Allow building TxnOffsetCommit v6 
requests with topic IDs (KIP-1319) (#22215)
2342c80dca4 is described below

commit 2342c80dca4ccede3128ff0ed62d45d2340a3bdb
Author: David Jacot <[email protected]>
AuthorDate: Wed May 6 15:32:41 2026 +0200

    KAFKA-20444: [3/N] Allow building TxnOffsetCommit v6 requests with topic 
IDs (KIP-1319) (#22215)
    
    This patch adds a `forTopicIdsOrNames(...)` factory to
    `TxnOffsetCommitRequest.Builder` that allows building requests starting
    from version 6 of the API. The existing `forTopicNames(...)` factory is
    capped at version 5. `build(short version)` validates that the request
    carries topic IDs starting from version 6 and topic names for versions 0
    to 5. No call site uses the new factory yet — that lands in a follow-up
    patch wiring up `TransactionManager`.
    
    Reviewers: Sean Quah <[email protected]>
---
 .../common/requests/TxnOffsetCommitRequest.java    |  49 +++++-
 .../kafka/common/requests/RequestResponseTest.java |   9 +-
 .../requests/TxnOffsetCommitRequestTest.java       | 165 ++++++++++++++++-----
 3 files changed, 180 insertions(+), 43 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 626116c70a1..a9e67d74fae 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import 
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
@@ -63,17 +64,14 @@ public class TxnOffsetCommitRequest extends AbstractRequest 
{
     public static class Builder extends 
AbstractRequest.Builder<TxnOffsetCommitRequest> {
 
         public final TxnOffsetCommitRequestData data;
-        public final boolean isTransactionV2Enabled;
 
         private Builder(
             final TxnOffsetCommitRequestData data,
-            final boolean isTransactionV2Enabled,
             final short oldestAllowedVersion,
             final short latestAllowedVersion
         ) {
             super(ApiKeys.TXN_OFFSET_COMMIT, oldestAllowedVersion, 
latestAllowedVersion);
             this.data = data;
-            this.isTransactionV2Enabled = isTransactionV2Enabled;
         }
 
         public static Builder forTopicNames(
@@ -82,9 +80,22 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
         ) {
             return new Builder(
                 data,
-                isTransactionV2Enabled,
                 ApiKeys.TXN_OFFSET_COMMIT.oldestVersion(),
-                (short) 5
+                isTransactionV2Enabled ? (short) 5 : 
LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
+            );
+        }
+
+        public static Builder forTopicIdsOrNames(
+            final TxnOffsetCommitRequestData data,
+            final boolean isTransactionV2Enabled,
+            final boolean enableUnstableLastVersion
+        ) {
+            return new Builder(
+                data,
+                ApiKeys.TXN_OFFSET_COMMIT.oldestVersion(),
+                isTransactionV2Enabled
+                    ? 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(enableUnstableLastVersion)
+                    : LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
             );
         }
 
@@ -94,8 +105,20 @@ public class TxnOffsetCommitRequest extends AbstractRequest 
{
                 throw new UnsupportedVersionException("Broker doesn't support 
group metadata commit API on version " + version
                     + ", minimum supported request version is 3 which requires 
brokers to be on version 2.5 or above.");
             }
-            if (!isTransactionV2Enabled) {
-                version = (short) Math.min(version, 
LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2);
+            if (version >= 6) {
+                for (TxnOffsetCommitRequestTopic topic : data.topics()) {
+                    if (topic.topicId() == null || 
topic.topicId().equals(Uuid.ZERO_UUID)) {
+                        throw new UnsupportedVersionException("The broker 
TxnOffsetCommit api version " +
+                            version + " does require usage of topic ids.");
+                    }
+                }
+            } else {
+                for (TxnOffsetCommitRequestTopic topic : data.topics()) {
+                    if (topic.name() == null || topic.name().isEmpty()) {
+                        throw new UnsupportedVersionException("The broker 
TxnOffsetCommit api version " +
+                            version + " does require usage of topic names.");
+                    }
+                }
             }
             return new TxnOffsetCommitRequest(data, version);
         }
@@ -132,7 +155,16 @@ public class TxnOffsetCommitRequest extends 
AbstractRequest {
         return offsetMap;
     }
 
-    public static List<TxnOffsetCommitRequestTopic> 
getTopics(Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits) {
+    public static List<TxnOffsetCommitRequestTopic> getTopics(
+        Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits
+    ) {
+        return getTopics(pendingTxnOffsetCommits, Map.of());
+    }
+
+    public static List<TxnOffsetCommitRequestTopic> getTopics(
+        Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits,
+        Map<String, Uuid> topicIds
+    ) {
         Map<String, List<TxnOffsetCommitRequestPartition>> topicPartitionMap = 
new HashMap<>();
         for (Map.Entry<TopicPartition, CommittedOffset> entry : 
pendingTxnOffsetCommits.entrySet()) {
             TopicPartition topicPartition = entry.getKey();
@@ -151,6 +183,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest 
{
         return topicPartitionMap.entrySet().stream()
                    .map(entry -> new TxnOffsetCommitRequestTopic()
                                      .setName(entry.getKey())
+                                     
.setTopicId(topicIds.getOrDefault(entry.getKey(), Uuid.ZERO_UUID))
                                      .setPartitions(entry.getValue()))
                    .collect(Collectors.toList());
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index f2f7cf33560..29bc1213c63 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2842,12 +2842,13 @@ public class RequestResponseTest {
         offsets.put(new TopicPartition("topic", 74),
                 new TxnOffsetCommitRequest.CommittedOffset(100, "blah", 
Optional.of(27)));
 
+        Map<String, Uuid> topicIds = Map.of("topic", Uuid.randomUuid());
         TxnOffsetCommitRequestData data = new TxnOffsetCommitRequestData()
             .setTransactionalId("transactionalId")
             .setGroupId("groupId")
             .setProducerId(21L)
             .setProducerEpoch((short) 42)
-            .setTopics(TxnOffsetCommitRequest.getTopics(offsets));
+            .setTopics(TxnOffsetCommitRequest.getTopics(offsets, topicIds));
 
         if (version >= 3) {
             data.setMemberId("member")
@@ -2855,7 +2856,11 @@ public class RequestResponseTest {
                 .setGroupInstanceId("instance");
         }
 
-        return TxnOffsetCommitRequest.Builder.forTopicNames(data, version >= 
5).build(version);
+        if (version >= 6) {
+            return TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data, 
true, true).build(version);
+        } else {
+            return TxnOffsetCommitRequest.Builder.forTopicNames(data, version 
>= 5).build(version);
+        }
     }
 
     private TxnOffsetCommitRequest 
createTxnOffsetCommitRequestWithAutoDowngrade() {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
index 6c37c59ede1..331fbcfba57 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import 
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
@@ -25,9 +26,11 @@ import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
 
 import java.util.HashMap;
 import java.util.List;
@@ -85,12 +88,13 @@ public class TxnOffsetCommitRequestTest extends 
OffsetCommitRequestTest {
         builderWithGroupMetadata = 
TxnOffsetCommitRequest.Builder.forTopicNames(dataWithGroupMetadata, true);
     }
 
-    @Test
-    @Override
-    public void testConstructor() {
-        Map<TopicPartition, Errors> errorsMap = new HashMap<>();
-        errorsMap.put(new TopicPartition(topicOne, partitionOne), 
Errors.NOT_COORDINATOR);
-        errorsMap.put(new TopicPartition(topicTwo, partitionTwo), 
Errors.NOT_COORDINATOR);
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT, toVersion = 5)
+    public void testConstructor(short version) {
+        var errorsMap = Map.of(
+            new TopicPartition(topicOne, partitionOne), Errors.NOT_COORDINATOR,
+            new TopicPartition(topicTwo, partitionTwo), Errors.NOT_COORDINATOR
+        );
 
         List<TxnOffsetCommitRequestTopic> expectedTopics = List.of(
             new TxnOffsetCommitRequestTopic()
@@ -110,23 +114,20 @@ public class TxnOffsetCommitRequestTest extends 
OffsetCommitRequestTest {
                         .setCommittedLeaderEpoch(leaderEpoch)
                         .setCommittedMetadata(metadata))));
 
-        for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
-            final TxnOffsetCommitRequest request;
-            if (version < 3) {
-                request = builder.build(version);
-            } else {
-                request = builderWithGroupMetadata.build(version);
-            }
-            assertEquals(OFFSETS, request.offsets());
-            assertEquals(expectedTopics, 
TxnOffsetCommitRequest.getTopics(request.offsets()));
-
-            TxnOffsetCommitResponse response =
-                request.getErrorResponse(throttleTimeMs, 
Errors.NOT_COORDINATOR.exception());
-
-            assertEquals(errorsMap, response.errors());
-            assertEquals(Map.of(Errors.NOT_COORDINATOR, 2), 
response.errorCounts());
-            assertEquals(throttleTimeMs, response.throttleTimeMs());
+        final TxnOffsetCommitRequest request;
+        if (version < 3) {
+            request = builder.build(version);
+        } else {
+            request = builderWithGroupMetadata.build(version);
         }
+        assertEquals(OFFSETS, request.offsets());
+        assertEquals(expectedTopics, 
TxnOffsetCommitRequest.getTopics(request.offsets()));
+
+        var response = request.getErrorResponse(throttleTimeMs, 
Errors.NOT_COORDINATOR.exception());
+
+        assertEquals(errorsMap, response.errors());
+        assertEquals(Map.of(Errors.NOT_COORDINATOR, 2), 
response.errorCounts());
+        assertEquals(throttleTimeMs, response.throttleTimeMs());
     }
 
     @Test
@@ -150,17 +151,115 @@ public class TxnOffsetCommitRequestTest extends 
OffsetCommitRequestTest {
         assertEquals(expectedResponse, 
getErrorResponse(builderWithGroupMetadata.data, Errors.UNKNOWN_MEMBER_ID));
     }
 
-    @Test
-    public void testVersionSupportForGroupMetadata() {
-        for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
-            assertDoesNotThrow(() -> builder.build(version));
-            if (version >= 3) {
-                assertDoesNotThrow(() -> 
builderWithGroupMetadata.build(version));
-            } else {
-                assertEquals("Broker doesn't support group metadata commit API 
on version " + version +
-                    ", minimum supported request version is 3 which requires 
brokers to be on version 2.5 or above.",
-                    assertThrows(UnsupportedVersionException.class, () -> 
builderWithGroupMetadata.build(version)).getMessage());
-            }
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT, toVersion = 5)
+    public void testVersionSupportForGroupMetadata(short version) {
+        assertDoesNotThrow(() -> builder.build(version));
+        if (version >= 3) {
+            assertDoesNotThrow(() -> builderWithGroupMetadata.build(version));
+        } else {
+            assertEquals("Broker doesn't support group metadata commit API on 
version " + version +
+                ", minimum supported request version is 3 which requires 
brokers to be on version 2.5 or above.",
+                assertThrows(UnsupportedVersionException.class, () -> 
builderWithGroupMetadata.build(version)).getMessage());
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+    public void testForTopicIdsOrNamesWithTopicNameOnly(short version) {
+        var data = new TxnOffsetCommitRequestData()
+            .setTransactionalId("tx")
+            .setGroupId(groupId)
+            .setProducerId(1)
+            .setProducerEpoch((short) 0)
+            .setTopics(List.of(
+                new TxnOffsetCommitRequestTopic()
+                    .setName("foo")
+                    .setPartitions(List.of(
+                        new TxnOffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(0L)))));
+
+        if (version >= 6) {
+            assertThrows(UnsupportedVersionException.class,
+                () -> TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data, 
true, true).build(version));
+        } else {
+            assertDoesNotThrow(
+                () -> TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data, 
true, true).build(version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+    public void testForTopicIdsOrNamesWithTopicIdOnly(short version) {
+        var topicId = Uuid.randomUuid();
+        var data = new TxnOffsetCommitRequestData()
+            .setTransactionalId("tx")
+            .setGroupId(groupId)
+            .setProducerId(1)
+            .setProducerEpoch((short) 0)
+            .setTopics(List.of(
+                new TxnOffsetCommitRequestTopic()
+                    .setTopicId(topicId)
+                    .setPartitions(List.of(
+                        new TxnOffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(0L)))));
+
+        if (version >= 6) {
+            var request = 
TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data, true, 
true).build(version);
+            assertEquals(data, request.data());
+        } else {
+            assertThrows(UnsupportedVersionException.class,
+                () -> TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data, 
true, true).build(version));
         }
     }
+
+    @Test
+    public void 
testForTopicNamesCapsAtTransactionV1WhenTransactionV2IsDisabled() {
+        var builder = TxnOffsetCommitRequest.Builder.forTopicNames(
+            new TxnOffsetCommitRequestData(),
+            false
+        );
+        
assertEquals(TxnOffsetCommitRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2, 
builder.latestAllowedVersion());
+    }
+
+    @Test
+    public void testForTopicNamesCapsAtV5WhenTransactionV2IsEnabled() {
+        var builder = TxnOffsetCommitRequest.Builder.forTopicNames(
+            new TxnOffsetCommitRequestData(),
+            true
+        );
+        assertEquals((short) 5, builder.latestAllowedVersion());
+    }
+
+    @Test
+    public void 
testForTopicIdsOrNamesCapsAtTransactionV1WhenTransactionV2IsDisabled() {
+        var builder = TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(
+            new TxnOffsetCommitRequestData(),
+            false,
+            true
+        );
+        
assertEquals(TxnOffsetCommitRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2, 
builder.latestAllowedVersion());
+    }
+
+    @Test
+    public void 
testForTopicIdsOrNamesUsesLatestStableVersionWhenUnstableIsDisabled() {
+        var builder = TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(
+            new TxnOffsetCommitRequestData(),
+            true,
+            false
+        );
+        assertEquals(ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false), 
builder.latestAllowedVersion());
+    }
+
+    @Test
+    public void 
testForTopicIdsOrNamesUsesLatestUnstableVersionWhenUnstableIsEnabled() {
+        var builder = TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(
+            new TxnOffsetCommitRequestData(),
+            true,
+            true
+        );
+        assertEquals(ApiKeys.TXN_OFFSET_COMMIT.latestVersion(true), 
builder.latestAllowedVersion());
+    }
 }

Reply via email to