This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 89f3888c871 KAFKA-20444: [9/N] Preserve topic-level structure in 
TxnOffsetCommit response handling (KIP-1319) (#22265)
89f3888c871 is described below

commit 89f3888c8718da91967badef560cf34ebe8e09ed
Author: David Jacot <[email protected]>
AuthorDate: Wed May 27 15:03:58 2026 +0200

    KAFKA-20444: [9/N] Preserve topic-level structure in TxnOffsetCommit 
response handling (KIP-1319) (#22265)
    
    `TxnOffsetCommitHandler.handleResponse` in `TransactionManager` folded
    the response into a `Map<TopicPartition, Errors>` via
    `TxnOffsetCommitResponse.errors()` before processing it. This loses the
    response's topic-level structure, which v6+ (KIP-1319) needs in order to
    resolve topic IDs back to names.
    
    This patch switches the handler to iterate directly over
    `response.data().topics()` and `responseTopic.partitions()`, so the
    per-topic structure is preserved. The error-handling switch ladder is
    unchanged; only the iteration shape is different. The now-unused
    `TxnOffsetCommitResponse.errors()` accessor is removed along with its
    two test usages.
    
    This is a pure refactor: at v0-5, the response topic always carries the
    topic name, so `new TopicPartition(responseTopic.name(),
    responsePartition.partitionIndex())` reproduces what `errors()` would
    have returned. The v6+ wiring that resolves topic IDs back to names will
    be added in a follow-up patch.
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../producer/internals/TransactionManager.java     | 102 +++++++++++----------
 .../common/requests/TxnOffsetCommitResponse.java   |  11 ---
 .../requests/TxnOffsetCommitRequestTest.java       |   6 --
 .../requests/TxnOffsetCommitResponseTest.java      |   1 -
 .../kafka/api/AuthorizerIntegrationTest.scala      |   5 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   5 +-
 6 files changed, 61 insertions(+), 69 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 3b0c183f3a3..2b10ceac6f3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.internal.RecordBatch;
@@ -1912,58 +1913,61 @@ public class TransactionManager {
         public void handleResponse(AbstractResponse response) {
             TxnOffsetCommitResponse txnOffsetCommitResponse = 
(TxnOffsetCommitResponse) response;
             boolean coordinatorReloaded = false;
-            Map<TopicPartition, Errors> errors = 
txnOffsetCommitResponse.errors();
 
-            log.debug("Received TxnOffsetCommit response for consumer group 
{}: {}", builder.data.groupId(),
-                    errors);
-
-            for (Map.Entry<TopicPartition, Errors> entry : errors.entrySet()) {
-                TopicPartition topicPartition = entry.getKey();
-                Errors error = entry.getValue();
-                if (error == Errors.NONE) {
-                    pendingTxnOffsetCommits.remove(topicPartition);
-                } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
-                        || error == Errors.NOT_COORDINATOR
-                        || error == Errors.REQUEST_TIMED_OUT) {
-                    if (!coordinatorReloaded) {
-                        coordinatorReloaded = true;
-                        
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, 
builder.data.groupId());
+            log.debug("Received TxnOffsetCommit response for consumer group 
{}: {}",
+                builder.data.groupId(), 
txnOffsetCommitResponse.data().topics());
+
+            for (TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic 
responseTopic : txnOffsetCommitResponse.data().topics()) {
+                for 
(TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition responsePartition 
: responseTopic.partitions()) {
+                    TopicPartition topicPartition = new 
TopicPartition(responseTopic.name(), responsePartition.partitionIndex());
+                    Errors error = 
Errors.forCode(responsePartition.errorCode());
+                    if (error == Errors.NONE) {
+                        pendingTxnOffsetCommits.remove(topicPartition);
+                    } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+                            || error == Errors.NOT_COORDINATOR
+                            || error == Errors.REQUEST_TIMED_OUT) {
+                        if (!coordinatorReloaded) {
+                            coordinatorReloaded = true;
+                            
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, 
builder.data.groupId());
+                        }
+                    } else if (error.exception() instanceof 
RetriableException) {
+                        // The topic is unknown, the coordinator is loading, 
or it is another retriable error;
+                        // retry with the current coordinator.
+                    } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+                        
abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
+                        break;
+                    } else if (error == Errors.FENCED_INSTANCE_ID ||
+                            error == Errors.TRANSACTION_ABORTABLE) {
+                        abortableError(error.exception());
+                        break;
+                    } else if (error == Errors.UNKNOWN_MEMBER_ID
+                            || error == Errors.ILLEGAL_GENERATION
+                            || error == Errors.GROUP_ID_NOT_FOUND
+                            || error == Errors.STALE_MEMBER_EPOCH) {
+                        // GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH are 
returned by
+                        // TxnOffsetCommit v6+. Older versions map them to
+                        // ILLEGAL_GENERATION. All four indicate a consumer 
group
+                        // metadata mismatch and must abort the transaction.
+                        abortableError(new CommitFailedException("Transaction 
offset Commit failed " +
+                            "due to consumer group metadata mismatch: " + 
error.exception().getMessage()));
+                        break;
+                    } else if (error == Errors.INVALID_PRODUCER_EPOCH
+                            || error == Errors.PRODUCER_FENCED) {
+                        // We could still receive INVALID_PRODUCER_EPOCH from 
old versioned transaction coordinator,
+                        // just treat it the same as PRODUCE_FENCED.
+                        fatalError(Errors.PRODUCER_FENCED.exception());
+                        break;
+                    } else if (error == 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
+                            || error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) 
{
+                        fatalError(error.exception());
+                        break;
+                    } else {
+                        fatalError(new KafkaException("Unexpected error in 
TxnOffsetCommitResponse: " + error.message()));
+                        break;
                     }
-                } else if (error.exception() instanceof RetriableException) {
-                    // If the topic is unknown, the coordinator is loading, or 
is another retriable error, retry with the current coordinator
-                    continue;
-                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                    
abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
-                    break;
-                } else if (error == Errors.FENCED_INSTANCE_ID ||
-                        error == Errors.TRANSACTION_ABORTABLE) {
-                    abortableError(error.exception());
-                    break;
-                } else if (error == Errors.UNKNOWN_MEMBER_ID
-                        || error == Errors.ILLEGAL_GENERATION
-                        || error == Errors.GROUP_ID_NOT_FOUND
-                        || error == Errors.STALE_MEMBER_EPOCH) {
-                    // GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH are returned 
by
-                    // TxnOffsetCommit v6+. Older versions map them to
-                    // ILLEGAL_GENERATION. All four indicate a consumer group
-                    // metadata mismatch and must abort the transaction.
-                    abortableError(new CommitFailedException("Transaction 
offset Commit failed " +
-                        "due to consumer group metadata mismatch: " + 
error.exception().getMessage()));
-                    break;
-                } else if (error == Errors.INVALID_PRODUCER_EPOCH
-                        || error == Errors.PRODUCER_FENCED) {
-                    // We could still receive INVALID_PRODUCER_EPOCH from old 
versioned transaction coordinator,
-                    // just treat it the same as PRODUCE_FENCED.
-                    fatalError(Errors.PRODUCER_FENCED.exception());
-                    break;
-                } else if (error == 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
-                        || error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
-                    fatalError(error.exception());
-                    break;
-                } else {
-                    fatalError(new KafkaException("Unexpected error in 
TxnOffsetCommitResponse: " + error.message()));
-                    break;
                 }
+                // Stop processing further topics once the transaction has 
reached a terminal state.
+                if (result.isCompleted()) break;
             }
 
             if (result.isCompleted()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 20de83f9750..2a4939f652a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -264,17 +264,6 @@ public class TxnOffsetCommitResponse extends 
AbstractResponse {
                         Errors.forCode(partition.errorCode()))));
     }
 
-    public Map<TopicPartition, Errors> errors() {
-        Map<TopicPartition, Errors> errorMap = new HashMap<>();
-        for (TxnOffsetCommitResponseTopic topic : data.topics()) {
-            for (TxnOffsetCommitResponsePartition partition : 
topic.partitions()) {
-                errorMap.put(new TopicPartition(topic.name(), 
partition.partitionIndex()),
-                             Errors.forCode(partition.errorCode()));
-            }
-        }
-        return errorMap;
-    }
-
     public static TxnOffsetCommitResponse parse(Readable readable, short 
version) {
         return new TxnOffsetCommitResponse(new 
TxnOffsetCommitResponseData(readable, version));
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
index f18cf5d84d1..649423c1779 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
@@ -90,11 +90,6 @@ public class TxnOffsetCommitRequestTest extends 
OffsetCommitRequestTest {
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT, toVersion = 5)
     public void testConstructor(short version) {
-        var errorsMap = Map.of(
-            new TopicPartition(topicOne, partitionOne), Errors.NOT_COORDINATOR,
-            new TopicPartition(topicTwo, partitionTwo), Errors.NOT_COORDINATOR
-        );
-
         List<TxnOffsetCommitRequestTopic> expectedTopics = List.of(
             new TxnOffsetCommitRequestTopic()
                 .setName(topicOne)
@@ -124,7 +119,6 @@ public class TxnOffsetCommitRequestTest extends 
OffsetCommitRequestTest {
 
         var response = request.getErrorResponse(throttleTimeMs, 
Errors.NOT_COORDINATOR.exception());
 
-        assertEquals(errorsMap, response.errors());
         assertEquals(Map.of(Errors.NOT_COORDINATOR, 2), 
response.errorCounts());
         assertEquals(throttleTimeMs, response.throttleTimeMs());
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
index d10eeab0ff6..cc78733d952 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
@@ -43,7 +43,6 @@ public class TxnOffsetCommitResponseTest extends 
OffsetCommitResponseTest {
     public void testConstructorWithErrorResponse() {
         TxnOffsetCommitResponse response = new 
TxnOffsetCommitResponse(throttleTimeMs, errorsMap);
 
-        assertEquals(errorsMap, response.errors());
         assertEquals(expectedErrorCounts, response.errorCounts());
         assertEquals(throttleTimeMs, response.throttleTimeMs());
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index d6c94d5baa4..f21e75c0eb8 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -156,7 +156,10 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => 
resp.errors.get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp)),
     ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => 
Errors.forCode(resp.data.errorCode)),
     ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error),
-    ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => 
resp.errors.get(tp)),
+    ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => 
resp.data.topics.asScala
+      .find(_.name == tp.topic)
+      .flatMap(_.partitions.asScala.find(_.partitionIndex == 
tp.partition).map(p => Errors.forCode(p.errorCode)))
+      .orNull),
     ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => 
Errors.forCode(resp.results.asScala.head.errorCode)),
     ApiKeys.DESCRIBE_ACLS -> ((resp: DescribeAclsResponse) => 
resp.error.error),
     ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => 
Errors.forCode(resp.filterResults.asScala.head.errorCode)),
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 02d4ee8300a..6ab30fce2fd 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1360,7 +1360,10 @@ class KafkaApisTest extends Logging {
         kafkaApis.handleTxnOffsetCommitRequest(request, 
RequestLocal.withThreadConfinedCaching)
 
         val response = verifyNoThrottling[TxnOffsetCommitResponse](request)
-        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response.errors().get(invalidTopicPartition))
+        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response.data.topics.asScala
+          .find(_.name == invalidTopicPartition.topic)
+          .flatMap(_.partitions.asScala.find(_.partitionIndex == 
invalidTopicPartition.partition).map(p => Errors.forCode(p.errorCode)))
+          .orNull)
       } finally {
         kafkaApis.close()
       }

Reply via email to