This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7f5861817d2 KAFKA-20444: [6/N] Handle GROUP_ID_NOT_FOUND and 
STALE_MEMBER_EPOCH in TransactionManager (KIP-1319) (#22239)
7f5861817d2 is described below

commit 7f5861817d2bed897890e256677b8a9b473849cd
Author: David Jacot <[email protected]>
AuthorDate: Sun May 10 18:20:00 2026 +0200

    KAFKA-20444: [6/N] Handle GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH in 
TransactionManager (KIP-1319) (#22239)
    
    This patch updates `TxnOffsetCommitHandler` in `TransactionManager` to
    handle the two new error codes alongside the existing
    `ILLEGAL_GENERATION` and `UNKNOWN_MEMBER_ID` cases. All four are treated
    as a consumer group metadata mismatch and abort the transaction with a
    `CommitFailedException`, so behavior at v6+ is identical to v0-5.
    
    Reviewers: Sean Quah <[email protected]>
---
 .../producer/internals/TransactionManager.java     |  8 ++++++-
 .../producer/internals/TransactionManagerTest.java | 28 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 50e35d32096..3b0c183f3a3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1940,7 +1940,13 @@ public class TransactionManager {
                     abortableError(error.exception());
                     break;
                 } else if (error == Errors.UNKNOWN_MEMBER_ID
-                        || error == Errors.ILLEGAL_GENERATION) {
+                        || error == Errors.ILLEGAL_GENERATION
+                        || error == Errors.GROUP_ID_NOT_FOUND
+                        || error == Errors.STALE_MEMBER_EPOCH) {
+                    // GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH are returned 
by
+                    // TxnOffsetCommit v6+. Older versions map them to
+                    // ILLEGAL_GENERATION. All four indicate a consumer group
+                    // metadata mismatch and must abort the transaction.
                     abortableError(new CommitFailedException("Transaction 
offset Commit failed " +
                         "due to consumer group metadata mismatch: " + 
error.exception().getMessage()));
                     break;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 798dfe54fb2..db76ee022b6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -1260,6 +1260,34 @@ public class TransactionManagerTest {
         assertAbortableError(CommitFailedException.class);
     }
 
+    @ParameterizedTest
+    @EnumSource(value = Errors.class, names = {"GROUP_ID_NOT_FOUND", 
"STALE_MEMBER_EPOCH"})
+    public void testGroupMetadataMismatchErrorInTxnOffsetCommit(Errors error) {
+        // GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH from TxnOffsetCommit (v6+)
+        // must abort the transaction with a CommitFailedException, matching 
the
+        // behavior for ILLEGAL_GENERATION returned by older broker versions.
+        final TopicPartition tp = new TopicPartition("foo", 0);
+
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        TransactionalRequestResult sendOffsetsResult = 
transactionManager.sendOffsetsToTransaction(
+            Map.of(tp, new OffsetAndMetadata(39L)), new 
ConsumerGroupMetadata(consumerGroupId));
+
+        prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, 
producerId, epoch);
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.GROUP, consumerGroupId);
+        runUntil(() -> transactionManager.coordinator(CoordinatorType.GROUP) 
!= null);
+
+        prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, 
Map.of(tp, error));
+
+        runUntil(transactionManager::hasError);
+        assertInstanceOf(CommitFailedException.class, 
transactionManager.lastError());
+        assertTrue(sendOffsetsResult.isCompleted());
+        assertFalse(sendOffsetsResult.isSuccessful());
+        assertInstanceOf(CommitFailedException.class, 
sendOffsetsResult.error());
+        assertAbortableError(CommitFailedException.class);
+    }
+
     @Test
     public void testLookupCoordinatorOnDisconnectAfterSend() {
         // This is called from the initTransactions method in the producer as 
the first order of business.

Reply via email to