This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0144bf1afe6 MINOR: Correct exception naming and preserve original
headers in DLQ (#21370)
0144bf1afe6 is described below
commit 0144bf1afe65b215e6babe3a642e9baefb923f14
Author: Eric Chang <[email protected]>
AuthorDate: Fri Jan 30 21:59:03 2026 +0800
MINOR: Correct exception naming and preserve original headers in DLQ
(#21370)
This patch fixed two mismatch between
[KIP-1034](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-ProposedChanges)
and the actual implementation:
- Incorrect value of `HEADER_ERRORS_EXCEPTION_NAME`, it should be the
name of the thrown exception, not exception's String representation.
- Original headers from record that causes exception should be added to
dlq record headers.
References:
- https://github.com/apache/kafka/pull/17942#discussion_r2737868443
- https://github.com/apache/kafka/pull/17942#discussion_r2738033268
-
[KIP-1034](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-ProposedChanges)
Reviewers: Lucas Brutschy <[email protected]>
---
.../streams/integration/DeadLetterQueueIntegrationTest.java | 12 ++++++------
.../streams/errors/internals/ExceptionHandlerUtils.java | 9 ++++++++-
.../kafka/streams/errors/ExceptionHandlerUtilsTest.java | 6 +++++-
3 files changed, 19 insertions(+), 8 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/DeadLetterQueueIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/DeadLetterQueueIntegrationTest.java
index 90811edf37f..3d7f91e4fc7 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/DeadLetterQueueIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/DeadLetterQueueIntegrationTest.java
@@ -140,7 +140,7 @@ public class DeadLetterQueueIntegrationTest {
assertEquals("key", new String(dlqRecords.get(0).key()), "Output
record should be sent to DLQ topic");
assertEquals("KABOOM", new String(dlqRecords.get(0).value()),
"Output record should be sent to DLQ topic");
- assertEquals("java.lang.RuntimeException: KABOOM", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
+ assertEquals("java.lang.RuntimeException", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("KABOOM", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertTrue(new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process"));
assertEquals(INPUT_TOPIC, new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
@@ -181,7 +181,7 @@ public class DeadLetterQueueIntegrationTest {
assertEquals("key", new String(dlqRecords.get(0).key()), "Output
record should be sent to DLQ topic");
assertEquals("KABOOM", new String(dlqRecords.get(0).value()),
"Output record should be sent to DLQ topic");
- assertEquals("java.lang.RuntimeException: KABOOM", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
+ assertEquals("java.lang.RuntimeException", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("KABOOM", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertTrue(new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process"));
assertEquals(INPUT_TOPIC, new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
@@ -220,7 +220,7 @@ public class DeadLetterQueueIntegrationTest {
assertEquals("key", new String(dlqRecords.get(0).key()), "Output
record should be sent to DLQ topic");
assertEquals("KABOOM", new String(dlqRecords.get(0).value()),
"Output record should be sent to DLQ topic");
- assertEquals("java.lang.RuntimeException: KABOOM", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
+ assertEquals("java.lang.RuntimeException", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("KABOOM", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertTrue(new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.integration.DeadLetterQueueIntegrationTest$1.process"));
assertEquals(INPUT_TOPIC, new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
@@ -261,7 +261,7 @@ public class DeadLetterQueueIntegrationTest {
assertEquals("key", new String(dlqRecords.get(0).key()), "Output
record should be sent to DLQ topic");
assertEquals("KABOOM", new String(dlqRecords.get(0).value()),
"Output record should be sent to DLQ topic");
- assertEquals("java.lang.RuntimeException: KABOOM", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
+ assertEquals("java.lang.RuntimeException", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("KABOOM", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertTrue(new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.integration.DeadLetterQueueIntegrationTest$1.process"));
assertEquals(INPUT_TOPIC, new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
@@ -303,7 +303,7 @@ public class DeadLetterQueueIntegrationTest {
assertEquals("key", new String(dlqRecords.get(0).key()), "Output
record should be sent to DLQ topic");
assertEquals("value", new String(dlqRecords.get(0).value()),
"Output record should be sent to DLQ topic");
-
assertEquals("org.apache.kafka.common.errors.SerializationException: Size of
data received by LongDeserializer is not 8", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
+
assertEquals("org.apache.kafka.common.errors.SerializationException", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("Size of data received by LongDeserializer is not 8",
new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertTrue(new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.common.errors.SerializationException:
Size of data received by LongDeserializer is not 8"));
assertEquals(INPUT_TOPIC, new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
@@ -344,7 +344,7 @@ public class DeadLetterQueueIntegrationTest {
assertEquals("key", new String(dlqRecords.get(0).key()), "Output
record should be sent to DLQ topic");
assertEquals("value", new String(dlqRecords.get(0).value()),
"Output record should be sent to DLQ topic");
-
assertEquals("org.apache.kafka.common.errors.SerializationException: Size of
data received by LongDeserializer is not 8", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
+
assertEquals("org.apache.kafka.common.errors.SerializationException", new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("Size of data received by LongDeserializer is not 8",
new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertTrue(new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.common.errors.SerializationException:
Size of data received by LongDeserializer is not 8"));
assertEquals(INPUT_TOPIC, new
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
index d3fd221cea8..74458ac11a9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.errors.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
@@ -83,12 +84,18 @@ public class ExceptionHandlerUtils {
throw new InvalidConfigurationException(String.format("%s cannot
be null while building dead letter queue record",
StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
final ProducerRecord<byte[], byte[]> producerRecord = new
ProducerRecord<>(deadLetterQueueTopicName, null, context.timestamp(), key,
value);
+ // Copy original headers from record that causes exception
+ if (context.headers() != null) {
+ for (Header header : context.headers()) {
+ producerRecord.headers().add(header);
+ }
+ }
final StringWriter stackTraceStringWriter = new StringWriter();
final PrintWriter stackTracePrintWriter = new
PrintWriter(stackTraceStringWriter);
e.printStackTrace(stackTracePrintWriter);
try (final StringSerializer stringSerializer = new StringSerializer())
{
- producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME,
stringSerializer.serialize(null, e.toString()));
+ producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME,
stringSerializer.serialize(null, e.getClass().getName()));
producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME,
stringSerializer.serialize(null, e.getMessage()));
producerRecord.headers().add(HEADER_ERRORS_STACKTRACE_NAME,
stringSerializer.serialize(null, stackTraceStringWriter.toString()));
producerRecord.headers().add(HEADER_ERRORS_TOPIC_NAME,
stringSerializer.serialize(null, context.topic()));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
b/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
index 915f3a3f650..bf2bc547c54 100644
---
a/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
@@ -82,11 +82,15 @@ public class ExceptionHandlerUtilsTest {
assertEquals(1, dlqRecord.timestamp());
assertEquals(key, new String(dlqRecord.key()));
assertEquals(value, new String(dlqRecord.value()));
- assertEquals(exception.toString(),
stringDeserializer.deserialize(null,
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_NAME).value()));
+ assertEquals(exception.getClass().getName(),
stringDeserializer.deserialize(null,
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals(exception.getMessage(),
stringDeserializer.deserialize(null,
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertEquals("source", stringDeserializer.deserialize(null,
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_TOPIC_NAME).value()));
assertEquals("3", stringDeserializer.deserialize(null,
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_PARTITION_NAME).value()));
assertEquals("2", stringDeserializer.deserialize(null,
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_OFFSET_NAME).value()));
+ // Verify original source headers are preserved
+ assertEquals("hello world",
+ stringDeserializer.deserialize(null,
+ headers.lastHeader("sourceHeader").value()));
}
@Test