This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 6a9ba19a5f7 KAFKA-18476: KafkaStreams should swallow 
TransactionAbortedException (#18487)
6a9ba19a5f7 is described below

commit 6a9ba19a5f716e09cf6385e404fb304e807609b3
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Tue Jan 14 18:03:14 2025 -0800

    KAFKA-18476: KafkaStreams should swallow TransactionAbortedException 
(#18487)
    
    TransactionAbortedException is a follow up error to a previous error,
    and such a previous error would already be handled when
    `producer.abortTransaction()` is called. Thus, a
    TransactionAbortedException can just be silently swallowed.
    
    Reviewers: Bill Bejeck <b...@confluent.io>
---
 .../processor/internals/RecordCollectorImpl.java   |  6 +++++
 .../processor/internals/RecordCollectorTest.java   | 26 ++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 81db581d04f..edc67083f10 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TransactionAbortedException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -441,6 +442,11 @@ public class RecordCollectorImpl implements 
RecordCollector {
             errorMessage += "\nWritten offsets would not be recorded and no 
more records would be sent since the producer is fenced, " +
                 "indicating the task may be migrated out";
             sendException.set(new TaskMigratedException(errorMessage, 
productionException));
+        } else if (productionException instanceof TransactionAbortedException) 
{
+            // swallow silently
+            //
+            // TransactionAbortedException is only thrown after 
`abortTransaction()` was called,
+            // so it's only a followup error, and Kafka Streams is already 
handling the original error
         } else {
             final ProductionExceptionHandlerResponse response;
             try {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index e68d5a4396f..b01b87ed85f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -33,6 +33,7 @@ import 
org.apache.kafka.common.errors.InvalidProducerEpochException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TransactionAbortedException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
@@ -1806,6 +1807,31 @@ public class RecordCollectorTest {
         }
     }
 
+    @Test
+    public void 
shouldSwallowTransactionAbortedExceptionAndNotCallProductionExceptionHandler() {
+        final MockProducer<byte[], byte[]> mockProducer = new MockProducer<>(
+            cluster,
+            false,
+            new org.apache.kafka.clients.producer.RoundRobinPartitioner(),
+            new ByteArraySerializer(),
+            new ByteArraySerializer()
+        );
+        streamsProducer = new StreamsProducer(
+            mockProducer,
+            EXACTLY_ONCE_V2,
+            Time.SYSTEM,
+            logContext
+        );
+
+        final RecordCollector collector = newRecordCollector(new 
ProductionExceptionHandlerMock());
+        collector.initialize();
+
+        collector.send(topic, "key", "val", null, 0, null, stringSerializer, 
stringSerializer, sinkNodeName, context);
+        mockProducer.errorNext(new TransactionAbortedException()); // error 
out the send() call
+
+        collector.flush(); // need to call flush() to check for internal 
exceptions
+    }
+
     @Test
     public void shouldNotSendIfSendOfOtherTaskFailedInCallback() {
         final TaskId taskId1 = new TaskId(0, 0);

Reply via email to