This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 6a9ba19a5f7 KAFKA-18476: KafkaStreams should swallow TransactionAbortedException (#18487) 6a9ba19a5f7 is described below commit 6a9ba19a5f716e09cf6385e404fb304e807609b3 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Tue Jan 14 18:03:14 2025 -0800 KAFKA-18476: KafkaStreams should swallow TransactionAbortedException (#18487) TransactionAbortedException is a follow up error to a previous error, and such a previous error would already be handled when `producer.abortTransaction()` is called. Thus, a TransactionAbortedException can just be silently swallowed. Reviewers: Bill Bejeck <b...@confluent.io> --- .../processor/internals/RecordCollectorImpl.java | 6 +++++ .../processor/internals/RecordCollectorTest.java | 26 ++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 81db581d04f..edc67083f10 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TransactionAbortedException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -441,6 +442,11 @@ public class RecordCollectorImpl implements RecordCollector { errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced, " + "indicating the task may be migrated out"; sendException.set(new TaskMigratedException(errorMessage, productionException)); + } else if (productionException instanceof TransactionAbortedException) { + // swallow silently + // + // TransactionAbortedException is only thrown after `abortTransaction()` was called, + // so it's only a followup error, and Kafka Streams is already handling the original error } else { final ProductionExceptionHandlerResponse response; try { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index e68d5a4396f..b01b87ed85f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.InvalidProducerEpochException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TransactionAbortedException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; @@ -1806,6 +1807,31 @@ public class RecordCollectorTest { } } + @Test + public void shouldSwallowTransactionAbortedExceptionAndNotCallProductionExceptionHandler() { + final MockProducer<byte[], byte[]> mockProducer = new MockProducer<>( + cluster, + false, + new org.apache.kafka.clients.producer.RoundRobinPartitioner(), + new ByteArraySerializer(), + new ByteArraySerializer() + ); + streamsProducer = new StreamsProducer( + mockProducer, + EXACTLY_ONCE_V2, + Time.SYSTEM, + logContext + ); + + final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock()); + collector.initialize(); + + collector.send(topic, "key", "val", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context); + mockProducer.errorNext(new TransactionAbortedException()); // error out the send() call + + collector.flush(); // need to call flush() to check for internal exceptions + } + @Test public void shouldNotSendIfSendOfOtherTaskFailedInCallback() { final TaskId taskId1 = new TaskId(0, 0);