This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0144bf1afe6 MINOR: Correct exception naming and preserve original 
headers in DLQ (#21370)
0144bf1afe6 is described below

commit 0144bf1afe65b215e6babe3a642e9baefb923f14
Author: Eric Chang <[email protected]>
AuthorDate: Fri Jan 30 21:59:03 2026 +0800

    MINOR: Correct exception naming and preserve original headers in DLQ 
(#21370)
    
    This patch fixed two mismatch between
    
    
[KIP-1034](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-ProposedChanges)
    and the actual implementation:
    
    - Incorrect value of `HEADER_ERRORS_EXCEPTION_NAME`, it should be the
    name of the thrown exception, not exception's String representation.
    - Original headers from record that causes exception should be added to
    dlq record headers.
    
    References:
    - https://github.com/apache/kafka/pull/17942#discussion_r2737868443
    - https://github.com/apache/kafka/pull/17942#discussion_r2738033268
    -
    
    
[KIP-1034](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-ProposedChanges)
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../streams/integration/DeadLetterQueueIntegrationTest.java  | 12 ++++++------
 .../streams/errors/internals/ExceptionHandlerUtils.java      |  9 ++++++++-
 .../kafka/streams/errors/ExceptionHandlerUtilsTest.java      |  6 +++++-
 3 files changed, 19 insertions(+), 8 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/DeadLetterQueueIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/DeadLetterQueueIntegrationTest.java
index 90811edf37f..3d7f91e4fc7 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/DeadLetterQueueIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/DeadLetterQueueIntegrationTest.java
@@ -140,7 +140,7 @@ public class DeadLetterQueueIntegrationTest {
             assertEquals("key", new String(dlqRecords.get(0).key()), "Output 
record should be sent to DLQ topic");
             assertEquals("KABOOM", new String(dlqRecords.get(0).value()), 
"Output record should be sent to DLQ topic");
 
-            assertEquals("java.lang.RuntimeException: KABOOM", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
+            assertEquals("java.lang.RuntimeException", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
             assertEquals("KABOOM", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
             assertTrue(new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process"));
             assertEquals(INPUT_TOPIC, new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
@@ -181,7 +181,7 @@ public class DeadLetterQueueIntegrationTest {
             assertEquals("key", new String(dlqRecords.get(0).key()), "Output 
record should be sent to DLQ topic");
             assertEquals("KABOOM", new String(dlqRecords.get(0).value()), 
"Output record should be sent to DLQ topic");
 
-            assertEquals("java.lang.RuntimeException: KABOOM", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
+            assertEquals("java.lang.RuntimeException", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
             assertEquals("KABOOM", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
             assertTrue(new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process"));
             assertEquals(INPUT_TOPIC, new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
@@ -220,7 +220,7 @@ public class DeadLetterQueueIntegrationTest {
             assertEquals("key", new String(dlqRecords.get(0).key()), "Output 
record should be sent to DLQ topic");
             assertEquals("KABOOM", new String(dlqRecords.get(0).value()), 
"Output record should be sent to DLQ topic");
 
-            assertEquals("java.lang.RuntimeException: KABOOM", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
+            assertEquals("java.lang.RuntimeException", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
             assertEquals("KABOOM", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
             assertTrue(new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.integration.DeadLetterQueueIntegrationTest$1.process"));
             assertEquals(INPUT_TOPIC, new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
@@ -261,7 +261,7 @@ public class DeadLetterQueueIntegrationTest {
             assertEquals("key", new String(dlqRecords.get(0).key()), "Output 
record should be sent to DLQ topic");
             assertEquals("KABOOM", new String(dlqRecords.get(0).value()), 
"Output record should be sent to DLQ topic");
 
-            assertEquals("java.lang.RuntimeException: KABOOM", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
+            assertEquals("java.lang.RuntimeException", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
             assertEquals("KABOOM", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
             assertTrue(new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.integration.DeadLetterQueueIntegrationTest$1.process"));
             assertEquals(INPUT_TOPIC, new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
@@ -303,7 +303,7 @@ public class DeadLetterQueueIntegrationTest {
             assertEquals("key", new String(dlqRecords.get(0).key()), "Output 
record should be sent to DLQ topic");
             assertEquals("value", new String(dlqRecords.get(0).value()), 
"Output record should be sent to DLQ topic");
 
-            
assertEquals("org.apache.kafka.common.errors.SerializationException: Size of 
data received by LongDeserializer is not 8", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
+            
assertEquals("org.apache.kafka.common.errors.SerializationException", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
             assertEquals("Size of data received by LongDeserializer is not 8", 
new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
             assertTrue(new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.common.errors.SerializationException:
 Size of data received by LongDeserializer is not 8"));
             assertEquals(INPUT_TOPIC, new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
@@ -344,7 +344,7 @@ public class DeadLetterQueueIntegrationTest {
             assertEquals("key", new String(dlqRecords.get(0).key()), "Output 
record should be sent to DLQ topic");
             assertEquals("value", new String(dlqRecords.get(0).value()), 
"Output record should be sent to DLQ topic");
 
-            
assertEquals("org.apache.kafka.common.errors.SerializationException: Size of 
data received by LongDeserializer is not 8", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
+            
assertEquals("org.apache.kafka.common.errors.SerializationException", new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
             assertEquals("Size of data received by LongDeserializer is not 8", 
new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
             assertTrue(new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.common.errors.SerializationException:
 Size of data received by LongDeserializer is not 8"));
             assertEquals(INPUT_TOPIC, new 
String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
index d3fd221cea8..74458ac11a9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.errors.internals;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ErrorHandlerContext;
@@ -83,12 +84,18 @@ public class ExceptionHandlerUtils {
             throw new InvalidConfigurationException(String.format("%s cannot 
be null while building dead letter queue record", 
StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
         }
         final ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(deadLetterQueueTopicName, null, context.timestamp(), key, 
value);
+        // Copy original headers from record that causes exception
+        if (context.headers() != null) {
+            for (Header header : context.headers()) {
+                producerRecord.headers().add(header);
+            }
+        }
         final StringWriter stackTraceStringWriter = new StringWriter();
         final PrintWriter stackTracePrintWriter = new 
PrintWriter(stackTraceStringWriter);
         e.printStackTrace(stackTracePrintWriter);
 
         try (final StringSerializer stringSerializer = new StringSerializer()) 
{
-            producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME, 
stringSerializer.serialize(null, e.toString()));
+            producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME, 
stringSerializer.serialize(null, e.getClass().getName()));
             producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME, 
stringSerializer.serialize(null, e.getMessage()));
             producerRecord.headers().add(HEADER_ERRORS_STACKTRACE_NAME, 
stringSerializer.serialize(null, stackTraceStringWriter.toString()));
             producerRecord.headers().add(HEADER_ERRORS_TOPIC_NAME, 
stringSerializer.serialize(null, context.topic()));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
index 915f3a3f650..bf2bc547c54 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
@@ -82,11 +82,15 @@ public class ExceptionHandlerUtilsTest {
         assertEquals(1, dlqRecord.timestamp());
         assertEquals(key, new String(dlqRecord.key()));
         assertEquals(value, new String(dlqRecord.value()));
-        assertEquals(exception.toString(), 
stringDeserializer.deserialize(null, 
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_NAME).value()));
+        assertEquals(exception.getClass().getName(), 
stringDeserializer.deserialize(null, 
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_NAME).value()));
         assertEquals(exception.getMessage(), 
stringDeserializer.deserialize(null, 
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
         assertEquals("source", stringDeserializer.deserialize(null, 
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_TOPIC_NAME).value()));
         assertEquals("3", stringDeserializer.deserialize(null, 
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_PARTITION_NAME).value()));
         assertEquals("2", stringDeserializer.deserialize(null, 
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_OFFSET_NAME).value()));
+        // Verify original source headers are preserved
+        assertEquals("hello world",
+            stringDeserializer.deserialize(null,
+                headers.lastHeader("sourceHeader").value()));
     }
 
     @Test

Reply via email to