This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 723847904b3 KAFKA-20444: [2/N] Update OffsetMetadataManager to use new 
TxnOffsetCommit errors (KIP-1319) (#22214)
723847904b3 is described below

commit 723847904b3844e88435b42f315a5fa9aa2534d7
Author: David Jacot <[email protected]>
AuthorDate: Wed May 6 09:09:40 2026 +0200

    KAFKA-20444: [2/N] Update OffsetMetadataManager to use new TxnOffsetCommit 
errors (KIP-1319) (#22214)
    
    This patch updates `OffsetMetadataManager` to propagate
    `GROUP_ID_NOT_FOUND` and `STALE_MEMBER_EPOCH` directly starting from
    version 6 of the `TxnOffsetCommit` API, both at the group lookup and at
    the per-partition validator. The legacy mapping to `ILLEGAL_GENERATION`
    is preserved for versions 0 to 5.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../common/requests/TxnOffsetCommitRequest.java    |  18 ++
 .../coordinator/group/OffsetMetadataManager.java   |  15 ++
 .../group/OffsetMetadataManagerTest.java           | 205 ++++++++++++---------
 3 files changed, 151 insertions(+), 87 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 30c40c29dab..626116c70a1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -40,6 +40,24 @@ import java.util.stream.Collectors;
 public class TxnOffsetCommitRequest extends AbstractRequest {
     public static final short LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 = 4;
 
+    /**
+     * @return true if the given version returns {@code GROUP_ID_NOT_FOUND} 
directly when the
+     *         group is not found; false if the legacy mapping to {@code 
ILLEGAL_GENERATION}
+     *         is used (KIP-1319).
+     */
+    public static boolean supportsGroupIdNotFoundError(short version) {
+        return version >= 6;
+    }
+
+    /**
+     * @return true if the given version returns {@code STALE_MEMBER_EPOCH} 
directly when the
+     *         member epoch is stale; false if the legacy mapping to {@code 
ILLEGAL_GENERATION}
+     *         is used (KIP-1319).
+     */
+    public static boolean supportsStaleMemberEpochError(short version) {
+        return version >= 6;
+    }
+
     private final TxnOffsetCommitRequestData data;
 
     public static class Builder extends 
AbstractRequest.Builder<TxnOffsetCommitRequest> {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 73fbc151243..ede54a05fab 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.internals.LogContext;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
@@ -528,7 +529,11 @@ public class OffsetMetadataManager {
                 // facility. In this case, a so-called simple group is created 
and the request
                 // is accepted.
                 group = 
groupMetadataManager.getOrMaybeCreateClassicGroup(request.groupId(), true);
+            } else if 
(TxnOffsetCommitRequest.supportsGroupIdNotFoundError((short) 
context.requestVersion())) {
+                // From v6 onwards, GROUP_ID_NOT_FOUND is propagated directly 
(KIP-1319).
+                throw ex;
             } else {
+                // For older versions, preserve the legacy mapping to 
ILLEGAL_GENERATION.
                 throw Errors.ILLEGAL_GENERATION.exception();
             }
         }
@@ -542,6 +547,11 @@ public class OffsetMetadataManager {
                 context.requestVersion()
             );
         } catch (StaleMemberEpochException ex) {
+            if (TxnOffsetCommitRequest.supportsStaleMemberEpochError((short) 
context.requestVersion())) {
+                // From v6 onwards, STALE_MEMBER_EPOCH is propagated directly 
(KIP-1319).
+                throw ex;
+            }
+            // For older versions, preserve the legacy mapping to 
ILLEGAL_GENERATION.
             throw Errors.ILLEGAL_GENERATION.exception();
         }
     }
@@ -725,6 +735,11 @@ public class OffsetMetadataManager {
                             partition.partitionIndex()
                         );
                     } catch (StaleMemberEpochException ex) {
+                        if 
(TxnOffsetCommitRequest.supportsStaleMemberEpochError((short) 
context.requestVersion())) {
+                            // From v6 onwards, STALE_MEMBER_EPOCH is 
propagated directly (KIP-1319).
+                            throw ex;
+                        }
+                        // For older versions, preserve the legacy mapping to 
ILLEGAL_GENERATION.
                         throw Errors.ILLEGAL_GENERATION.exception();
                     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index adb17e4b17a..b0f3170e7e2 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -265,11 +265,18 @@ public class OffsetMetadataManagerTest {
 
         public CoordinatorResult<TxnOffsetCommitResponseData, 
CoordinatorRecord> commitTransactionalOffset(
             TxnOffsetCommitRequestData request
+        ) {
+            return commitTransactionalOffset(request, 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion());
+        }
+
+        public CoordinatorResult<TxnOffsetCommitResponseData, 
CoordinatorRecord> commitTransactionalOffset(
+            TxnOffsetCommitRequestData request,
+            short version
         ) {
             RequestContext context = new RequestContext(
                 new RequestHeader(
                     ApiKeys.TXN_OFFSET_COMMIT,
-                    ApiKeys.TXN_OFFSET_COMMIT.latestVersion(),
+                    version,
                     "client",
                     0
                 ),
@@ -1654,8 +1661,9 @@ public class OffsetMetadataManagerTest {
         verifyTransactionalOffsetCommit(context);
     }
 
-    @Test
-    public void testConsumerGroupTransactionalOffsetCommitResolvesTopicId() {
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+    public void 
testConsumerGroupTransactionalOffsetCommitResolvesTopicId(short version) {
         Uuid barTopicId = Uuid.randomUuid();
         String barTopicName = "bar";
 
@@ -1696,13 +1704,17 @@ public class OffsetMetadataManagerTest {
 
         // When client epoch (3) < assignment epoch (5), exception should be 
thrown.
         request.setGenerationIdOrMemberEpoch(3);
-        assertThrows(IllegalGenerationException.class, () -> 
context.commitTransactionalOffset(request));
+        Class<? extends Throwable> expected = version >= 6
+            ? StaleMemberEpochException.class
+            : IllegalGenerationException.class;
+        assertThrows(expected, () -> 
context.commitTransactionalOffset(request, version));
 
         // When client epoch (5) >= assignment epoch (5), commit should 
succeed.
         request.setGenerationIdOrMemberEpoch(5);
-        assertDoesNotThrow(() -> context.commitTransactionalOffset(request));
+        assertDoesNotThrow(() -> context.commitTransactionalOffset(request, 
version));
 
-        CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord> 
result = context.commitTransactionalOffset(request);
+        CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord> 
result =
+            context.commitTransactionalOffset(request, version);
         assertEquals(
             new TxnOffsetCommitResponseData()
                 .setTopics(List.of(
@@ -1788,27 +1800,31 @@ public class OffsetMetadataManagerTest {
         );
     }
 
-    @Test
-    public void testTransactionalOffsetCommitWithUnknownGroupId() {
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+    public void testTransactionalOffsetCommitWithUnknownGroupId(short version) 
{
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
-        assertThrows(IllegalGenerationException.class, () -> 
context.commitTransactionalOffset(
-            new TxnOffsetCommitRequestData()
-                .setGroupId("foo")
-                .setMemberId("member")
-                .setGenerationIdOrMemberEpoch(10)
-                .setTopics(List.of(
-                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
-                        .setName("bar")
-                        .setPartitions(List.of(
-                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
-                                .setPartitionIndex(0)
-                                .setCommittedOffset(100L)
-                                .setCommittedLeaderEpoch(10)
-                                .setCommittedMetadata("metadata")
-                        ))
-                ))
-        ));
+        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+            .setGroupId("foo")
+            .setMemberId("member")
+            .setGenerationIdOrMemberEpoch(10)
+            .setTopics(List.of(
+                new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                    .setName("bar")
+                    .setPartitions(List.of(
+                        new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(100L)
+                            .setCommittedLeaderEpoch(10)
+                            .setCommittedMetadata("metadata")
+                    ))
+            ));
+
+        Class<? extends Throwable> expected = version >= 6
+            ? GroupIdNotFoundException.class
+            : IllegalGenerationException.class;
+        assertThrows(expected, () -> 
context.commitTransactionalOffset(request, version));
     }
 
     @Test
@@ -1857,8 +1873,9 @@ public class OffsetMetadataManagerTest {
         ));
     }
 
-    @Test
-    public void 
testConsumerGroupTransactionalOffsetCommitWithStaleMemberEpoch() {
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+    public void 
testConsumerGroupTransactionalOffsetCommitWithStaleMemberEpoch(short version) {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create an empty group.
@@ -1874,7 +1891,7 @@ public class OffsetMetadataManagerTest {
             .build()
         );
 
-        verifyTransactionalOffsetCommitWithStaleMemberEpoch(context);
+        verifyTransactionalOffsetCommitWithStaleMemberEpoch(context, version);
     }
 
     @Test
@@ -1914,8 +1931,9 @@ public class OffsetMetadataManagerTest {
         ));
     }
 
-    @Test
-    public void 
testStreamsGroupTransactionalOffsetCommitWithStaleMemberEpoch() {
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+    public void 
testStreamsGroupTransactionalOffsetCommitWithStaleMemberEpoch(short version) {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create an empty group.
@@ -1930,27 +1948,33 @@ public class OffsetMetadataManagerTest {
             .build()
         );
 
-        verifyTransactionalOffsetCommitWithStaleMemberEpoch(context);
+        verifyTransactionalOffsetCommitWithStaleMemberEpoch(context, version);
     }
 
-    private static void 
verifyTransactionalOffsetCommitWithStaleMemberEpoch(OffsetMetadataManagerTestContext
 context) {
-        assertThrows(IllegalGenerationException.class, () -> 
context.commitTransactionalOffset(
-            new TxnOffsetCommitRequestData()
-                .setGroupId("foo")
-                .setMemberId("member")
-                .setGenerationIdOrMemberEpoch(100)
-                .setTopics(List.of(
-                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
-                        .setName("bar")
-                        .setPartitions(List.of(
-                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
-                                .setPartitionIndex(0)
-                                .setCommittedOffset(100L)
-                                .setCommittedLeaderEpoch(10)
-                                .setCommittedMetadata("metadata")
-                        ))
-                ))
-        ));
+    private static void verifyTransactionalOffsetCommitWithStaleMemberEpoch(
+        OffsetMetadataManagerTestContext context,
+        short version
+    ) {
+        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+            .setGroupId("foo")
+            .setMemberId("member")
+            .setGenerationIdOrMemberEpoch(100)
+            .setTopics(List.of(
+                new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                    .setName("bar")
+                    .setPartitions(List.of(
+                        new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(100L)
+                            .setCommittedLeaderEpoch(10)
+                            .setCommittedMetadata("metadata")
+                    ))
+            ));
+
+        Class<? extends Throwable> expected = version >= 6
+            ? StaleMemberEpochException.class
+            : IllegalGenerationException.class;
+        assertThrows(expected, () -> 
context.commitTransactionalOffset(request, version));
     }
 
     @Test
@@ -2023,27 +2047,31 @@ public class OffsetMetadataManagerTest {
         );
     }
 
-    @Test
-    public void testGenericGroupTransactionalOffsetCommitWithUnknownGroupId() {
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+    public void 
testGenericGroupTransactionalOffsetCommitWithUnknownGroupId(short version) {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
-        assertThrows(IllegalGenerationException.class, () -> 
context.commitTransactionalOffset(
-            new TxnOffsetCommitRequestData()
-                .setGroupId("foo")
-                .setMemberId("member")
-                .setGenerationIdOrMemberEpoch(10)
-                .setTopics(List.of(
-                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
-                        .setName("bar")
-                        .setPartitions(List.of(
-                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
-                                .setPartitionIndex(0)
-                                .setCommittedOffset(100L)
-                                .setCommittedLeaderEpoch(10)
-                                .setCommittedMetadata("metadata")
-                        ))
-                ))
-        ));
+        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+            .setGroupId("foo")
+            .setMemberId("member")
+            .setGenerationIdOrMemberEpoch(10)
+            .setTopics(List.of(
+                new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                    .setName("bar")
+                    .setPartitions(List.of(
+                        new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(100L)
+                            .setCommittedLeaderEpoch(10)
+                            .setCommittedMetadata("metadata")
+                    ))
+            ));
+
+        Class<? extends Throwable> expected = version >= 6
+            ? GroupIdNotFoundException.class
+            : IllegalGenerationException.class;
+        assertThrows(expected, () -> 
context.commitTransactionalOffset(request, version));
     }
 
     @Test
@@ -2075,8 +2103,9 @@ public class OffsetMetadataManagerTest {
         ));
     }
 
-    @Test
-    public void 
testGenericGroupTransactionalOffsetCommitWithIllegalGenerationId() {
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+    public void 
testGenericGroupTransactionalOffsetCommitWithIllegalGenerationId(short version) 
{
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create a group.
@@ -2095,23 +2124,25 @@ public class OffsetMetadataManagerTest {
         assertEquals(1, group.generationId());
         group.transitionTo(ClassicGroupState.STABLE);
 
-        assertThrows(IllegalGenerationException.class, () -> 
context.commitTransactionalOffset(
-            new TxnOffsetCommitRequestData()
-                .setGroupId("foo")
-                .setMemberId("member")
-                .setGenerationIdOrMemberEpoch(100)
-                .setTopics(List.of(
-                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
-                        .setName("bar")
-                        .setPartitions(List.of(
-                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
-                                .setPartitionIndex(0)
-                                .setCommittedOffset(100L)
-                                .setCommittedLeaderEpoch(10)
-                                .setCommittedMetadata("metadata")
-                        ))
-                ))
-        ));
+        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+            .setGroupId("foo")
+            .setMemberId("member")
+            .setGenerationIdOrMemberEpoch(100)
+            .setTopics(List.of(
+                new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                    .setName("bar")
+                    .setPartitions(List.of(
+                        new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(100L)
+                            .setCommittedLeaderEpoch(10)
+                            .setCommittedMetadata("metadata")
+                    ))
+            ));
+
+        // Classic groups always throw IllegalGenerationException, regardless 
of API version.
+        assertThrows(IllegalGenerationException.class,
+            () -> context.commitTransactionalOffset(request, version));
     }
 
     @Test

Reply via email to