This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch kafka-14884-2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4a0bbf8f80a9f45d1a2f7b646db9e0acc8f53597
Author: Justine <[email protected]>
AuthorDate: Fri Jun 23 17:27:24 2023 -0700

    Handle failed batch via txn manager if we haven't already
---
 .../java/org/apache/kafka/clients/producer/internals/Sender.java   | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 1cf141c06e2..8c6c2adc30a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -783,13 +783,12 @@ public class Sender implements Runnable {
         Function<Integer, RuntimeException> recordExceptions,
         boolean adjustSequenceNumbers
     ) {
-        if (transactionManager != null) {
-            transactionManager.handleFailedBatch(batch, topLevelException, 
adjustSequenceNumbers);
-        }
-
         this.sensors.recordErrors(batch.topicPartition.topic(), 
batch.recordCount);
 
         if (batch.completeExceptionally(topLevelException, recordExceptions)) {
+            if (transactionManager != null) {
+                transactionManager.handleFailedBatch(batch, topLevelException, 
adjustSequenceNumbers);
+            }
             maybeRemoveAndDeallocateBatch(batch);
         }
     }

Reply via email to