This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 33a4709490f Handle retriable errors in TxnOffsetCommits (#15266)
33a4709490f is described below

commit 33a4709490f6bf8c8844a26dd22f9df1764e9701
Author: Justine Olshan <jols...@confluent.io>
AuthorDate: Fri Jan 26 10:59:21 2024 -0800

    Handle retriable errors in TxnOffsetCommits (#15266)
    
    https://issues.apache.org/jira/browse/KAFKA-14417 updated all transactional 
APIs to handle retriable errors except for TxnOffsetCommit.
    
    NetworkExceptions have been causing flaky failures in 
TransactionsBounceTest -- this should hopefully fix that.
    
    Reviewers: Luke Chen <show...@gmail.com>, Jason Gustafson 
<ja...@confluent.io>
---
 .../kafka/clients/producer/internals/TransactionManager.java       | 5 ++---
 .../kafka/clients/producer/internals/TransactionManagerTest.java   | 7 ++++++-
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 1d343484388..14a519a7a0f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1673,9 +1673,8 @@ public class TransactionManager {
                         coordinatorReloaded = true;
                         
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, 
builder.data.groupId());
                     }
-                } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION
-                        || error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
-                    // If the topic is unknown or the coordinator is loading, 
retry with the current coordinator
+                } else if (error.exception() instanceof RetriableException) {
+                    // If the topic is unknown, the coordinator is loading, or 
is another retriable error, retry with the current coordinator
                     continue;
                 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                     
abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index b615d1c5356..0e9bd834c61 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -1730,7 +1730,7 @@ public class TransactionManagerTest {
             "COORDINATOR_LOAD_IN_PROGRESS",
             "CONCURRENT_TRANSACTIONS"
     })
-    public void testRetriableErrors2(Errors error) {
+    public void testRetriableErrors(Errors error) {
         // Ensure FindCoordinator retries.
         TransactionalRequestResult result = 
transactionManager.initializeTransactions();
         prepareFindCoordinatorResponse(error, false, 
CoordinatorType.TRANSACTION, transactionalId);
@@ -2233,6 +2233,11 @@ public class TransactionManagerTest {
         
testRetriableErrorInTxnOffsetCommit(Errors.COORDINATOR_LOAD_IN_PROGRESS);
     }
 
+    @Test
+    public void testHandlingOfNetworkExceptionOnTxnOffsetCommit() {
+        testRetriableErrorInTxnOffsetCommit(Errors.NETWORK_EXCEPTION);
+    }
+
     private void testRetriableErrorInTxnOffsetCommit(Errors error) {
         doInitTransactions();
 

Reply via email to