This is an automated email from the ASF dual-hosted git repository. jolshan pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 33a4709490f Handle retriable errors in TxnOffsetCommits (#15266) 33a4709490f is described below commit 33a4709490f6bf8c8844a26dd22f9df1764e9701 Author: Justine Olshan <jols...@confluent.io> AuthorDate: Fri Jan 26 10:59:21 2024 -0800 Handle retriable errors in TxnOffsetCommits (#15266) https://issues.apache.org/jira/browse/KAFKA-14417 updated all transactional APIs to handle retriable errors except for TxnOffsetCommit. NetworkExceptions have been causing flaky failures in TransactionsBounceTest -- this should hopefully fix that. Reviewers: Luke Chen <show...@gmail.com>, Jason Gustafson <ja...@confluent.io> --- .../kafka/clients/producer/internals/TransactionManager.java | 5 ++--- .../kafka/clients/producer/internals/TransactionManagerTest.java | 7 ++++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 1d343484388..14a519a7a0f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -1673,9 +1673,8 @@ public class TransactionManager { coordinatorReloaded = true; lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, builder.data.groupId()); } - } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION - || error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { - // If the topic is unknown or the coordinator is loading, retry with the current coordinator + } else if (error.exception() instanceof RetriableException) { + // If the topic is unknown, the coordinator is loading, or is another retriable error, retry with the current coordinator continue; } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId())); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index b615d1c5356..0e9bd834c61 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -1730,7 +1730,7 @@ public class TransactionManagerTest { "COORDINATOR_LOAD_IN_PROGRESS", "CONCURRENT_TRANSACTIONS" }) - public void testRetriableErrors2(Errors error) { + public void testRetriableErrors(Errors error) { // Ensure FindCoordinator retries. TransactionalRequestResult result = transactionManager.initializeTransactions(); prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId); @@ -2233,6 +2233,11 @@ public class TransactionManagerTest { testRetriableErrorInTxnOffsetCommit(Errors.COORDINATOR_LOAD_IN_PROGRESS); } + @Test + public void testHandlingOfNetworkExceptionOnTxnOffsetCommit() { + testRetriableErrorInTxnOffsetCommit(Errors.NETWORK_EXCEPTION); + } + private void testRetriableErrorInTxnOffsetCommit(Errors error) { doInitTransactions();