This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f54cfff1dca MINOR: simplify producer TX abort error handling (#18486) f54cfff1dca is described below commit f54cfff1dcaaa43139417fddf38a7009cd26e710 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Fri Jan 10 17:54:40 2025 -0800 MINOR: simplify producer TX abort error handling (#18486) Reviewers: Justine Olshan <jols...@confluent.io>, Jason Gustafson <ja...@responsive.dev> --- .../apache/kafka/clients/producer/internals/Sender.java | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index e98122bd9c8..9190281a660 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -462,17 +462,10 @@ public class Sender implements Runnable { return true; } - if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { - if (accumulator.hasIncomplete()) { - // Attempt to get the last error that caused this abort. - RuntimeException exception = transactionManager.lastError(); - // If there was no error, but we are still aborting, - // then this is most likely a case where there was no fatal error. - if (exception == null) { - exception = new TransactionAbortedException(); - } - accumulator.abortUndrainedBatches(exception); - } + if (transactionManager.hasAbortableError()) { + accumulator.abortUndrainedBatches(transactionManager.lastError()); + } else if (transactionManager.isAborting()) { + accumulator.abortUndrainedBatches(new TransactionAbortedException()); } TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequest(accumulator.hasIncomplete());