This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f54cfff1dca MINOR: simplify producer TX abort error handling (#18486)
f54cfff1dca is described below

commit f54cfff1dcaaa43139417fddf38a7009cd26e710
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Fri Jan 10 17:54:40 2025 -0800

    MINOR: simplify producer TX abort error handling (#18486)
    
    Reviewers: Justine Olshan <jols...@confluent.io>, Jason Gustafson 
<ja...@responsive.dev>
---
 .../apache/kafka/clients/producer/internals/Sender.java   | 15 ++++-----------
 1 file changed, 4 insertions(+), 11 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index e98122bd9c8..9190281a660 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -462,17 +462,10 @@ public class Sender implements Runnable {
             return true;
         }
 
-        if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
-            if (accumulator.hasIncomplete()) {
-                // Attempt to get the last error that caused this abort.
-                RuntimeException exception = transactionManager.lastError();
-                // If there was no error, but we are still aborting,
-                // then this is most likely a case where there was no fatal 
error.
-                if (exception == null) {
-                    exception = new TransactionAbortedException();
-                }
-                accumulator.abortUndrainedBatches(exception);
-            }
+        if (transactionManager.hasAbortableError()) {
+            accumulator.abortUndrainedBatches(transactionManager.lastError());
+        } else if (transactionManager.isAborting()) {
+            accumulator.abortUndrainedBatches(new 
TransactionAbortedException());
         }
 
         TransactionManager.TxnRequestHandler nextRequestHandler = 
transactionManager.nextRequest(accumulator.hasIncomplete());

Reply via email to