This is an automated email from the ASF dual-hosted git repository. jolshan pushed a commit to branch kafka-14884-2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4a0bbf8f80a9f45d1a2f7b646db9e0acc8f53597 Author: Justine <[email protected]> AuthorDate: Fri Jun 23 17:27:24 2023 -0700 Handle failed batch via txn manager if we haven't already --- .../java/org/apache/kafka/clients/producer/internals/Sender.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 1cf141c06e2..8c6c2adc30a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -783,13 +783,12 @@ public class Sender implements Runnable { Function<Integer, RuntimeException> recordExceptions, boolean adjustSequenceNumbers ) { - if (transactionManager != null) { - transactionManager.handleFailedBatch(batch, topLevelException, adjustSequenceNumbers); - } - this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); if (batch.completeExceptionally(topLevelException, recordExceptions)) { + if (transactionManager != null) { + transactionManager.handleFailedBatch(batch, topLevelException, adjustSequenceNumbers); + } maybeRemoveAndDeallocateBatch(batch); } }
