This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cdc2d957edb KAFKA-16505: Adding dead letter queue in Kafka Streams 
(#17942)
cdc2d957edb is described below

commit cdc2d957edb2551b4ace30acfc72f6853f00790b
Author: Gasparina Damien <d.gaspar...@gmail.com>
AuthorDate: Mon Jul 21 15:54:40 2025 +0200

    KAFKA-16505: Adding dead letter queue in Kafka Streams (#17942)
    
    Implements KIP-1034 to add support of Dead Letter
    Queue in Kafka Streams.
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>, Bruno Cadonna
     <cado...@apache.org>
    Co-authored-by: Sebastien Viale <sebastien.vi...@michelin.com>
---
 .../ProcessingExceptionHandlerIntegrationTest.java |  10 +-
 .../SwallowUnknownTopicErrorIntegrationTest.java   |  10 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |  10 +
 .../errors/DefaultProductionExceptionHandler.java  |  36 +--
 .../errors/DeserializationExceptionHandler.java    | 156 +++++++++++++
 .../errors/LogAndContinueExceptionHandler.java     |  39 +---
 .../LogAndContinueProcessingExceptionHandler.java  |  14 +-
 .../streams/errors/LogAndFailExceptionHandler.java |  39 +---
 .../LogAndFailProcessingExceptionHandler.java      |  13 +-
 .../streams/errors/ProcessingExceptionHandler.java | 162 +++++++++++++-
 .../streams/errors/ProductionExceptionHandler.java | 217 +++++++++++++++++-
 .../errors/internals/ExceptionHandlerUtils.java    | 101 +++++++++
 .../streams/processor/internals/ProcessorNode.java |  23 +-
 .../processor/internals/RecordCollector.java       |   7 +
 .../processor/internals/RecordCollectorImpl.java   |  74 ++++--
 .../processor/internals/RecordDeserializer.java    |  27 ++-
 .../streams/processor/internals/StreamTask.java    |  22 +-
 .../apache/kafka/streams/StreamsConfigTest.java    |   5 +
 .../streams/errors/ExceptionHandlerUtilsTest.java  | 114 ++++++++++
 .../processor/internals/ProcessorNodeTest.java     | 132 ++++++++++-
 .../processor/internals/RecordCollectorTest.java   | 247 ++++++++++++++++++---
 .../internals/RecordDeserializerTest.java          | 168 +++++++++++++-
 .../processor/internals/StreamTaskTest.java        |   4 +-
 .../org/apache/kafka/test/MockRecordCollector.java |  17 ++
 24 files changed, 1477 insertions(+), 170 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
index 38711093ff8..4406f292205 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
@@ -357,7 +357,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
             final StreamsException e = assertThrows(StreamsException.class, () 
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
             assertEquals("Fatal user code error in processing error callback", 
e.getMessage());
             assertInstanceOf(NullPointerException.class, e.getCause());
-            assertEquals("Invalid ProductionExceptionHandler response.", 
e.getCause().getMessage());
+            assertEquals("Invalid ProcessingExceptionHandler response.", 
e.getCause().getMessage());
             assertFalse(isExecuted.get());
         }
     }
@@ -524,7 +524,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
 
     public static class ContinueProcessingExceptionHandlerMockTest implements 
ProcessingExceptionHandler {
         @Override
-        public ProcessingExceptionHandler.ProcessingHandlerResponse 
handle(final ErrorHandlerContext context, final Record<?, ?> record, final 
Exception exception) {
+        public Response handleError(final ErrorHandlerContext context, final 
Record<?, ?> record, final Exception exception) {
             if (((String) record.key()).contains("FATAL")) {
                 throw new RuntimeException("KABOOM!");
             }
@@ -532,7 +532,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
                 return null;
             }
             assertProcessingExceptionHandlerInputs(context, record, exception);
-            return 
ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE;
+            return Response.resume();
         }
 
         @Override
@@ -543,9 +543,9 @@ public class ProcessingExceptionHandlerIntegrationTest {
 
     public static class FailProcessingExceptionHandlerMockTest implements 
ProcessingExceptionHandler {
         @Override
-        public ProcessingExceptionHandler.ProcessingHandlerResponse 
handle(final ErrorHandlerContext context, final Record<?, ?> record, final 
Exception exception) {
+        public Response handleError(final ErrorHandlerContext context, final 
Record<?, ?> record, final Exception exception) {
             assertProcessingExceptionHandlerInputs(context, record, exception);
-            return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL;
+            return Response.fail();
         }
 
         @Override
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java
index a82e832e21c..0535dd2465e 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java
@@ -156,15 +156,15 @@ public class SwallowUnknownTopicErrorIntegrationTest {
         public void configure(final Map<String, ?> configs) { }
 
         @Override
-        public ProductionExceptionHandlerResponse handle(final 
ErrorHandlerContext context,
-                                                         final 
ProducerRecord<byte[], byte[]> record,
-                                                         final Exception 
exception) {
+        public Response handleError(final ErrorHandlerContext context,
+                                    final ProducerRecord<byte[], byte[]> 
record,
+                                    final Exception exception) {
             if (exception instanceof TimeoutException &&
                 exception.getCause() != null &&
                 exception.getCause() instanceof 
UnknownTopicOrPartitionException) {
-                return ProductionExceptionHandlerResponse.CONTINUE;
+                return Response.resume();
             }
-            return ProductionExceptionHandler.super.handle(context, record, 
exception);
+            return ProductionExceptionHandler.super.handleError(context, 
record, exception);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 5e1eaff1162..6364e7e9126 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -619,6 +619,11 @@ public class StreamsConfig extends AbstractConfig {
         "support \"classic\" or \"streams\". If \"streams\" is specified, then 
the streams rebalance protocol will be " +
         "used. Otherwise, the classic group protocol will be used.";
 
+    public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = 
"errors.dead.letter.queue.topic.name";
+
+    private static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "If 
not null, the default exception handler will build and send a Dead Letter Queue 
record to the topic with the provided name if an error occurs.\n" +
+            "If a custom deserialization/production or processing exception 
handler is set, this parameter is ignored for this handler.";
+
     /** {@code log.summary.interval.ms} */
     public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = 
"log.summary.interval.ms";
     private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output 
interval in milliseconds for logging summary information.\n" +
@@ -991,6 +996,11 @@ public class StreamsConfig extends AbstractConfig {
                     LogAndFailExceptionHandler.class.getName(),
                     Importance.MEDIUM,
                     DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
+            .define(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
+                    Type.STRING,
+                    null,
+                    Importance.MEDIUM,
+                    ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC)
             .define(MAX_TASK_IDLE_MS_CONFIG,
                     Type.LONG,
                     0L,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
index 5994326770c..3e9eb2fba86 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
@@ -18,38 +18,42 @@ package org.apache.kafka.streams.errors;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.streams.StreamsConfig;
 
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
 /**
  * {@code ProductionExceptionHandler} that always instructs streams to fail 
when an exception
  * happens while attempting to produce result records.
  */
 public class DefaultProductionExceptionHandler implements 
ProductionExceptionHandler {
-    /**
-     * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, 
ProducerRecord, Exception)} instead.
-     */
-    @SuppressWarnings("deprecation")
-    @Deprecated
+
+    private String deadLetterQueueTopic = null;
+
     @Override
-    public ProductionExceptionHandlerResponse handle(final 
ProducerRecord<byte[], byte[]> record,
-                                                     final Exception 
exception) {
+    public Response handleError(final ErrorHandlerContext context,
+                                final ProducerRecord<byte[], byte[]> record,
+                                final Exception exception) {
         return exception instanceof RetriableException ?
-            ProductionExceptionHandlerResponse.RETRY :
-            ProductionExceptionHandlerResponse.FAIL;
+            Response.retry() :
+            
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, 
context.sourceRawKey(), context.sourceRawValue(), context, exception));
     }
 
+    @SuppressWarnings("rawtypes")
     @Override
-    public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext 
context,
-                                                     final 
ProducerRecord<byte[], byte[]> record,
-                                                     final Exception 
exception) {
-        return exception instanceof RetriableException ?
-            ProductionExceptionHandlerResponse.RETRY :
-            ProductionExceptionHandlerResponse.FAIL;
+    public Response handleSerializationError(final ErrorHandlerContext context,
+                                             final ProducerRecord record,
+                                             final Exception exception,
+                                             final 
SerializationExceptionOrigin origin) {
+        return 
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, 
context.sourceRawKey(), context.sourceRawValue(), context, exception));
     }
 
+
     @Override
     public void configure(final Map<String, ?> configs) {
-        // ignore
+        if 
(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
+            deadLetterQueueTopic = 
String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
index 0b44e04d791..8c3667c20f4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
@@ -17,10 +17,14 @@
 package org.apache.kafka.streams.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
+import java.util.Collections;
+import java.util.List;
+
 /**
  * Interface that specifies how an exception from source node deserialization
  * (e.g., reading from Kafka) should be handled.
@@ -63,16 +67,35 @@ public interface DeserializationExceptionHandler extends 
Configurable {
      *     The actual exception.
      *
      * @return Whether to continue or stop processing.
+     *
+     * @deprecated Use {@link #handleError(ErrorHandlerContext, 
ConsumerRecord, Exception)} instead.
      */
+    @Deprecated
     default DeserializationHandlerResponse handle(final ErrorHandlerContext 
context,
                                                   final ConsumerRecord<byte[], 
byte[]> record,
                                                   final Exception exception) {
         return handle(((DefaultErrorHandlerContext) 
context).processorContext().orElse(null), record, exception);
     }
 
+    /**
+     * Inspects a record and the exception received during deserialization.
+     *
+     * @param context
+     *     Error handler context.
+     * @param record
+     *     Record that failed deserialization.
+     * @param exception
+     *     The actual exception.
+     *
+     * @return a {@link Response} object
+     */
+    default Response handleError(final ErrorHandlerContext context, final 
ConsumerRecord<byte[], byte[]> record, final Exception exception) {
+        return new Response(Result.from(handle(context, record, exception)), 
Collections.emptyList());
+    }
     /**
      * Enumeration that describes the response from the exception handler.
      */
+    @Deprecated
     enum DeserializationHandlerResponse {
         /** Continue processing. */
         CONTINUE(0, "CONTINUE"),
@@ -95,4 +118,137 @@ public interface DeserializationExceptionHandler extends 
Configurable {
         }
     }
 
+    /**
+     * Enumeration that describes the response from the exception handler.
+     */
+    enum Result {
+        /** Continue processing. */
+        RESUME(0, "RESUME"),
+        /** Fail processing. */
+        FAIL(1, "FAIL");
+
+        /**
+         * An english description for the used option. This is for debugging 
only and may change.
+         */
+        public final String name;
+
+        /**
+         * The permanent and immutable id for the used option. This can't 
change ever.
+         */
+        public final int id;
+
+        Result(final int id, final String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /**
+         * Converts the deprecated enum DeserializationHandlerResponse into 
the new Result enum.
+         *
+         * @param value the old DeserializationHandlerResponse enum value
+         * @return a {@link Result} enum value
+         * @throws IllegalArgumentException if the provided value does not map 
to a valid {@link Result}
+         */
+        private static DeserializationExceptionHandler.Result from(final 
DeserializationHandlerResponse value) {
+            switch (value) {
+                case FAIL:
+                    return Result.FAIL;
+                case CONTINUE:
+                    return Result.RESUME;
+                default:
+                    throw new IllegalArgumentException("No Result enum found 
for old value: " + value);
+            }
+        }
+    }
+
+    /**
+     * Represents the result of handling a deserialization exception.
+     * <p>
+     * The {@code Response} class encapsulates a {@link Result},
+     * indicating whether processing should continue or fail, along with an 
optional list of
+     * {@link ProducerRecord} instances to be sent to a dead letter queue.
+     * </p>
+     */
+    class Response {
+
+        private final Result result;
+
+        private final List<ProducerRecord<byte[], byte[]>> 
deadLetterQueueRecords;
+
+        /**
+         * Constructs a new {@code DeserializationExceptionResponse} object.
+         *
+         * @param result the result indicating whether processing should 
continue or fail;
+         *                                  must not be {@code null}.
+         * @param deadLetterQueueRecords the list of records to be sent to the 
dead letter queue; may be {@code null}.
+         */
+        private Response(final Result result,
+                         final List<ProducerRecord<byte[], byte[]>> 
deadLetterQueueRecords) {
+            this.result = result;
+            this.deadLetterQueueRecords = deadLetterQueueRecords;
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should fail.
+         *
+         * @param deadLetterQueueRecords the list of records to be sent to the 
dead letter queue; may be {@code null}.
+         * @return a {@code Response} with a {@link 
DeserializationExceptionHandler.Result#FAIL} status.
+         */
+        public static Response fail(final List<ProducerRecord<byte[], byte[]>> 
deadLetterQueueRecords) {
+            return new Response(Result.FAIL, deadLetterQueueRecords);
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should fail.
+         *
+         * @return a {@code Response} with a {@link 
DeserializationExceptionHandler.Result#FAIL} status.
+         */
+        public static Response fail() {
+            return fail(Collections.emptyList());
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should 
continue.
+         *
+         * @param deadLetterQueueRecords the list of records to be sent to the 
dead letter queue; may be {@code null}.
+         * @return a {@code Response} with a {@link 
DeserializationExceptionHandler.Result#RESUME} status.
+         */
+        public static Response resume(final List<ProducerRecord<byte[], 
byte[]>> deadLetterQueueRecords) {
+            return new Response(Result.RESUME, deadLetterQueueRecords);
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should 
continue.
+         *
+         * @return a {@code Response} with a {@link 
DeserializationExceptionHandler.Result#RESUME} status.
+         */
+        public static Response resume() {
+            return resume(Collections.emptyList());
+        }
+
+        /**
+         * Retrieves the deserialization handler result.
+         *
+         * @return the {@link Result} indicating whether processing should 
continue or fail.
+         */
+        public Result result() {
+            return result;
+        }
+
+        /**
+         * Retrieves an unmodifiable list of records to be sent to the dead 
letter queue.
+         * <p>
+         * If the list is {@code null}, an empty list is returned.
+         * </p>
+         *
+         * @return an unmodifiable list of {@link ProducerRecord} instances
+         *         for the dead letter queue, or an empty list if no records 
are available.
+         */
+        public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
+            if (deadLetterQueueRecords == null) {
+                return Collections.emptyList();
+            }
+            return Collections.unmodifiableList(deadLetterQueueRecords);
+        }
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
index 6de997be986..63972b0840d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
@@ -17,47 +17,27 @@
 package org.apache.kafka.streams.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamsConfig;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
 /**
  * Deserialization handler that logs a deserialization exception and then
  * signals the processing pipeline to continue processing more records.
  */
 public class LogAndContinueExceptionHandler implements 
DeserializationExceptionHandler {
     private static final Logger log = 
LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);
+    private String deadLetterQueueTopic = null;
 
-    /**
-     * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, 
ConsumerRecord, Exception)} instead.
-     */
-    @SuppressWarnings("deprecation")
-    @Deprecated
     @Override
-    public DeserializationHandlerResponse handle(final ProcessorContext 
context,
-                                                 final ConsumerRecord<byte[], 
byte[]> record,
-                                                 final Exception exception) {
-
-        log.warn(
-            "Exception caught during Deserialization, taskId: {}, topic: {}, 
partition: {}, offset: {}",
-            context.taskId(),
-            record.topic(),
-            record.partition(),
-            record.offset(),
-            exception
-        );
-
-        return DeserializationHandlerResponse.CONTINUE;
-    }
-
-    @Override
-    public DeserializationHandlerResponse handle(final ErrorHandlerContext 
context,
-                                                 final ConsumerRecord<byte[], 
byte[]> record,
-                                                 final Exception exception) {
-
+    public Response handleError(final ErrorHandlerContext context,
+                                final ConsumerRecord<byte[], byte[]> record,
+                                final Exception exception) {
         log.warn(
             "Exception caught during Deserialization, taskId: {}, topic: {}, 
partition: {}, offset: {}",
             context.taskId(),
@@ -67,11 +47,12 @@ public class LogAndContinueExceptionHandler implements 
DeserializationExceptionH
             exception
         );
 
-        return DeserializationHandlerResponse.CONTINUE;
+        return 
Response.resume(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, 
context.sourceRawKey(), context.sourceRawValue(), context, exception));
     }
 
     @Override
     public void configure(final Map<String, ?> configs) {
-        // ignore
+        if 
(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
+            deadLetterQueueTopic = 
String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
index c832ab14200..17de09e5f0c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.errors;
 
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.api.Record;
 
 import org.slf4j.Logger;
@@ -23,15 +24,20 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
 /**
  * Processing exception handler that logs a processing exception and then
  * signals the processing pipeline to continue processing more records.
  */
 public class LogAndContinueProcessingExceptionHandler implements 
ProcessingExceptionHandler {
     private static final Logger log = 
LoggerFactory.getLogger(LogAndContinueProcessingExceptionHandler.class);
+    private String deadLetterQueueTopic = null;
 
     @Override
-    public ProcessingHandlerResponse handle(final ErrorHandlerContext context, 
final Record<?, ?> record, final Exception exception) {
+    public Response handleError(final ErrorHandlerContext context,
+                                final Record<?, ?> record,
+                                final Exception exception) {
         log.warn(
             "Exception caught during message processing, processor node: {}, 
taskId: {}, source topic: {}, source partition: {}, source offset: {}",
             context.processorNodeId(),
@@ -41,12 +47,12 @@ public class LogAndContinueProcessingExceptionHandler 
implements ProcessingExcep
             context.offset(),
             exception
         );
-
-        return ProcessingHandlerResponse.CONTINUE;
+        return 
Response.resume(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, 
context.sourceRawKey(), context.sourceRawValue(), context, exception));
     }
 
     @Override
     public void configure(final Map<String, ?> configs) {
-        // ignore
+        if 
(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
+            deadLetterQueueTopic = 
String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
index 20e6b9414de..6fc129b4d78 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
@@ -17,47 +17,27 @@
 package org.apache.kafka.streams.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamsConfig;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
 /**
  * Deserialization handler that logs a deserialization exception and then
  * signals the processing pipeline to stop processing more records and fail.
  */
 public class LogAndFailExceptionHandler implements 
DeserializationExceptionHandler {
     private static final Logger log = 
LoggerFactory.getLogger(LogAndFailExceptionHandler.class);
+    private String deadLetterQueueTopic = null;
 
-    /**
-     * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, 
ConsumerRecord, Exception)} instead.
-     */
-    @SuppressWarnings("deprecation")
-    @Deprecated
     @Override
-    public DeserializationHandlerResponse handle(final ProcessorContext 
context,
-                                                 final ConsumerRecord<byte[], 
byte[]> record,
-                                                 final Exception exception) {
-
-        log.error(
-            "Exception caught during Deserialization, taskId: {}, topic: {}, 
partition: {}, offset: {}",
-            context.taskId(),
-            record.topic(),
-            record.partition(),
-            record.offset(),
-            exception
-        );
-
-        return DeserializationHandlerResponse.FAIL;
-    }
-
-    @Override
-    public DeserializationHandlerResponse handle(final ErrorHandlerContext 
context,
-                                                 final ConsumerRecord<byte[], 
byte[]> record,
-                                                 final Exception exception) {
-
+    public Response handleError(final ErrorHandlerContext context,
+                                final ConsumerRecord<byte[], byte[]> record,
+                                final Exception exception) {
         log.error(
             "Exception caught during Deserialization, taskId: {}, topic: {}, 
partition: {}, offset: {}",
             context.taskId(),
@@ -67,11 +47,12 @@ public class LogAndFailExceptionHandler implements 
DeserializationExceptionHandl
             exception
         );
 
-        return DeserializationHandlerResponse.FAIL;
+        return 
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, 
context.sourceRawKey(), context.sourceRawValue(), context, exception));
     }
 
     @Override
     public void configure(final Map<String, ?> configs) {
-        // ignore
+        if 
(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
+            deadLetterQueueTopic = 
String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
index f592663a6c0..5372d9ad0b6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.errors;
 
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.api.Record;
 
 import org.slf4j.Logger;
@@ -23,15 +24,20 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
 /**
  * Processing exception handler that logs a processing exception and then
  * signals the processing pipeline to stop processing more records and fail.
  */
 public class LogAndFailProcessingExceptionHandler implements 
ProcessingExceptionHandler {
     private static final Logger log = 
LoggerFactory.getLogger(LogAndFailProcessingExceptionHandler.class);
+    private String deadLetterQueueTopic = null;
 
     @Override
-    public ProcessingHandlerResponse handle(final ErrorHandlerContext context, 
final Record<?, ?> record, final Exception exception) {
+    public Response handleError(final ErrorHandlerContext context,
+                                final Record<?, ?> record,
+                                final Exception exception) {
         log.error(
             "Exception caught during message processing, processor node: {}, 
taskId: {}, source topic: {}, source partition: {}, source offset: {}",
             context.processorNodeId(),
@@ -42,11 +48,12 @@ public class LogAndFailProcessingExceptionHandler 
implements ProcessingException
             exception
         );
 
-        return ProcessingHandlerResponse.FAIL;
+        return 
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, 
context.sourceRawKey(), context.sourceRawValue(), context, exception));
     }
 
     @Override
     public void configure(final Map<String, ?> configs) {
-        // ignore
+        if 
(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
+            deadLetterQueueTopic = 
String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
index 7dc1b90bc2e..f4c32764877 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
@@ -16,13 +16,18 @@
  */
 package org.apache.kafka.streams.errors;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.streams.processor.api.Record;
 
+import java.util.Collections;
+import java.util.List;
+
 /**
  * An interface that allows user code to inspect a record that has failed 
processing
  */
 public interface ProcessingExceptionHandler extends Configurable {
+
     /**
      * Inspect a record and the exception received
      *
@@ -34,9 +39,30 @@ public interface ProcessingExceptionHandler extends 
Configurable {
      *     The actual exception.
      *
      * @return Whether to continue or stop processing.
+     * @deprecated Use {@link #handleError(ErrorHandlerContext, Record, 
Exception)} instead.
+     */
+    @Deprecated
+    default ProcessingHandlerResponse handle(final ErrorHandlerContext 
context, final Record<?, ?> record, final Exception exception) {
+        throw new UnsupportedOperationException();
+    };
+
+    /**
+     * Inspects a record and the exception received during processing.
+     *
+     * @param context
+     *     Processing context metadata.
+     * @param record
+     *     Record where the exception occurred.
+     * @param exception
+     *     The actual exception.
+     *
+     * @return a {@link Response} object
      */
-    ProcessingHandlerResponse handle(final ErrorHandlerContext context, final 
Record<?, ?> record, final Exception exception);
+    default Response handleError(final ErrorHandlerContext context, final 
Record<?, ?> record, final Exception exception) {
+        return new 
Response(ProcessingExceptionHandler.Result.from(handle(context, record, 
exception)), Collections.emptyList());
+    }
 
+    @Deprecated
     enum ProcessingHandlerResponse {
         /** Continue processing. */
         CONTINUE(1, "CONTINUE"),
@@ -58,4 +84,138 @@ public interface ProcessingExceptionHandler extends 
Configurable {
             this.name = name;
         }
     }
+
+    /**
+     * Enumeration that describes the response from the exception handler.
+     */
+    enum Result {
+        /** Resume processing. */
+        RESUME(1, "RESUME"),
+        /** Fail processing. */
+        FAIL(2, "FAIL");
+
+        /**
+         * An english description for the used option. This is for debugging 
only and may change.
+         */
+        public final String name;
+
+        /**
+         * The permanent and immutable id for the used option. This can't 
change ever.
+         */
+        public final int id;
+
+        Result(final int id, final String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /**
+         * Converts the deprecated enum ProcessingHandlerResponse into the new 
Result enum.
+         *
+         * @param value the old DeserializationHandlerResponse enum value
+         * @return a {@link ProcessingExceptionHandler.Result} enum value
+         * @throws IllegalArgumentException if the provided value does not map 
to a valid {@link ProcessingExceptionHandler.Result}
+         */
+        private static ProcessingExceptionHandler.Result from(final 
ProcessingHandlerResponse value) {
+            switch (value) {
+                case FAIL:
+                    return Result.FAIL;
+                case CONTINUE:
+                    return Result.RESUME;
+                default:
+                    throw new IllegalArgumentException("No Result enum found 
for old value: " + value);
+            }
+        }
+    }
+
+    /**
+     * Represents the result of handling a processing exception.
+     * <p>
+     * The {@code Response} class encapsulates a {@link Result},
+     * indicating whether processing should continue or fail, along with an 
optional list of
+     * {@link org.apache.kafka.clients.producer.ProducerRecord} instances to 
be sent to a dead letter queue.
+     * </p>
+     */
+    class Response {
+
+        private final Result result;
+
+        private final List<ProducerRecord<byte[], byte[]>> 
deadLetterQueueRecords;
+
+        /**
+         * Constructs a new {@code ProcessingExceptionResponse} object.
+         *
+         * @param result the result indicating whether processing should 
continue or fail;
+         *                                  must not be {@code null}.
+         * @param deadLetterQueueRecords the list of records to be sent to the 
dead letter queue; may be {@code null}.
+         */
+        private Response(final Result result,
+                         final List<ProducerRecord<byte[], byte[]>> 
deadLetterQueueRecords) {
+            this.result = result;
+            this.deadLetterQueueRecords = deadLetterQueueRecords;
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should fail.
+         *
+         * @param deadLetterQueueRecords the list of records to be sent to the 
dead letter queue; may be {@code null}.
+         * @return a {@code Response} with a {@link 
ProcessingExceptionHandler.Result#FAIL} status.
+         */
+        public static Response fail(final List<ProducerRecord<byte[], byte[]>> 
deadLetterQueueRecords) {
+            return new Response(Result.FAIL, deadLetterQueueRecords);
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should fail.
+         *
+         * @return a {@code Response} with a {@link 
ProcessingExceptionHandler.Result#FAIL} status.
+         */
+        public static Response fail() {
+            return fail(Collections.emptyList());
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should 
continue.
+         *
+         * @param deadLetterQueueRecords the list of records to be sent to the 
dead letter queue; may be {@code null}.
+         * @return a {@code Response} with a {@link 
ProcessingExceptionHandler.Result#RESUME} status.
+         */
+        public static Response resume(final List<ProducerRecord<byte[], 
byte[]>> deadLetterQueueRecords) {
+            return new Response(Result.RESUME, deadLetterQueueRecords);
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should 
continue.
+         *
+         * @return a {@code Response} with a {@link 
ProcessingExceptionHandler.Result#RESUME} status.
+         */
+        public static Response resume() {
+            return resume(Collections.emptyList());
+        }
+
+        /**
+         * Retrieves the processing handler result.
+         *
+         * @return the {@link Result} indicating whether processing should 
continue or fail.
+         */
+        public Result result() {
+            return result;
+        }
+
+        /**
+         * Retrieves an unmodifiable list of records to be sent to the dead 
letter queue.
+         * <p>
+         * If the list is {@code null}, an empty list is returned.
+         * </p>
+         *
+         * @return an unmodifiable list of {@link ProducerRecord} instances
+         *         for the dead letter queue, or an empty list if no records 
are available.
+         */
+        public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
+            if (deadLetterQueueRecords == null) {
+                return Collections.emptyList();
+            }
+            return Collections.unmodifiableList(deadLetterQueueRecords);
+        }
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
index ed6b38a5692..717866a9bed 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
@@ -19,6 +19,9 @@ package org.apache.kafka.streams.errors;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.Configurable;
 
+import java.util.Collections;
+import java.util.List;
+
 /**
  * Interface that specifies how an exception when attempting to produce a 
result to
  * Kafka should be handled.
@@ -55,13 +58,34 @@ public interface ProductionExceptionHandler extends 
Configurable {
      *     The exception that occurred during production.
      *
      * @return Whether to continue or stop processing, or retry the failed 
operation.
+     * @deprecated Use {@link #handleError(ErrorHandlerContext, 
ProducerRecord, Exception)} instead.
      */
+    @Deprecated
     default ProductionExceptionHandlerResponse handle(final 
ErrorHandlerContext context,
                                                       final 
ProducerRecord<byte[], byte[]> record,
                                                       final Exception 
exception) {
         return handle(record, exception);
     }
 
+    /**
+     * Inspect a record that we attempted to produce, and the exception that 
resulted
+     * from attempting to produce it and determine to continue or stop 
processing.
+     *
+     * @param context
+     *     The error handler context metadata.
+     * @param record
+     *     The record that failed to produce.
+     * @param exception
+     *     The exception that occurred during production.
+     *
+     * @return a {@link Response} object
+     */
+    default Response handleError(final ErrorHandlerContext context,
+                                 final ProducerRecord<byte[], byte[]> record,
+                                 final Exception exception) {
+        return new Response(Result.from(handle(context, record, exception)), 
Collections.emptyList());
+    }
+
     /**
      * Handles serialization exception and determine if the process should 
continue. The default implementation is to
      * fail the process.
@@ -79,7 +103,7 @@ public interface ProductionExceptionHandler extends 
Configurable {
     @Deprecated
     default ProductionExceptionHandlerResponse 
handleSerializationException(final ProducerRecord record,
                                                                             
final Exception exception) {
-        return ProductionExceptionHandlerResponse.FAIL;
+        return 
ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL;
     }
 
     /**
@@ -96,8 +120,11 @@ public interface ProductionExceptionHandler extends 
Configurable {
      *     The origin of the serialization exception.
      *
      * @return Whether to continue or stop processing, or retry the failed 
operation.
+     *
+     * @deprecated Use {@link #handleSerializationError(ErrorHandlerContext, 
ProducerRecord, Exception, SerializationExceptionOrigin)} instead.
      */
     @SuppressWarnings("rawtypes")
+    @Deprecated
     default ProductionExceptionHandlerResponse 
handleSerializationException(final ErrorHandlerContext context,
                                                                             
final ProducerRecord record,
                                                                             
final Exception exception,
@@ -105,6 +132,30 @@ public interface ProductionExceptionHandler extends 
Configurable {
         return handleSerializationException(record, exception);
     }
 
+    /**
+     * Handles serialization exception and determine if the process should 
continue. The default implementation is to
+     * fail the process.
+     *
+     * @param context
+     *     The error handler context metadata.
+     * @param record
+     *     The record that failed to serialize.
+     * @param exception
+     *     The exception that occurred during serialization.
+     * @param origin
+     *     The origin of the serialization exception.
+     *
+     * @return a {@link Response} object
+     */
+    @SuppressWarnings("rawtypes")
+    default Response handleSerializationError(final ErrorHandlerContext 
context,
+                                              final ProducerRecord record,
+                                              final Exception exception,
+                                              final 
SerializationExceptionOrigin origin) {
+        return new Response(Result.from(handleSerializationException(context, 
record, exception, origin)), Collections.emptyList());
+    }
+
+    @Deprecated
     enum ProductionExceptionHandlerResponse {
         /** Continue processing.
          *
@@ -147,10 +198,174 @@ public interface ProductionExceptionHandler extends 
Configurable {
         }
     }
 
+    /**
+     * Enumeration that describes the response from the exception handler.
+     */
+    enum Result {
+        /** Resume processing.
+         *
+         * <p> For this case, output records which could not be written 
successfully are lost.
+         * Use this option only if you can tolerate data loss.
+         */
+        RESUME(0, "RESUME"),
+        /** Fail processing.
+         *
+         * <p> Kafka Streams will raise an exception and the {@code 
StreamsThread} will fail.
+         * No offsets (for {@link 
org.apache.kafka.streams.StreamsConfig#AT_LEAST_ONCE at-least-once}) or 
transactions
+         * (for {@link org.apache.kafka.streams.StreamsConfig#EXACTLY_ONCE_V2 
exactly-once}) will be committed.
+         */
+        FAIL(1, "FAIL"),
+        /** Retry the failed operation.
+         *
+         * <p> Retrying might imply that a {@link TaskCorruptedException} 
exception is thrown, and that the retry
+         * is started from the last committed offset.
+         *
+         * <p> <b>NOTE:</b> {@code RETRY} is only a valid return value for
+         * {@link org.apache.kafka.common.errors.RetriableException retriable 
exceptions}.
+         * If {@code RETRY} is returned for a non-retriable exception it will 
be interpreted as {@link #FAIL}.
+         */
+        RETRY(2, "RETRY");
+
+        /**
+         * An english description for the used option. This is for debugging 
only and may change.
+         */
+        public final String name;
+
+        /**
+         * The permanent and immutable id for the used option. This can't 
change ever.
+         */
+        public final int id;
+
+        Result(final int id, final String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /**
+         * Converts the deprecated enum ProductionExceptionHandlerResponse 
into the new Result enum.
+         *
+         * @param value the old ProductionExceptionHandlerResponse enum value
+         * @return a {@link ProductionExceptionHandler.Result} enum value
+         * @throws IllegalArgumentException if the provided value does not map 
to a valid {@link ProductionExceptionHandler.Result}
+         */
+        private static ProductionExceptionHandler.Result from(final 
ProductionExceptionHandlerResponse value) {
+            switch (value) {
+                case FAIL:
+                    return Result.FAIL;
+                case CONTINUE:
+                    return Result.RESUME;
+                case RETRY:
+                    return Result.RETRY;
+                default:
+                    throw new IllegalArgumentException("No Result enum found 
for old value: " + value);
+            }
+        }
+    }
+
     enum SerializationExceptionOrigin {
         /** Serialization exception occurred during serialization of the key. 
*/
         KEY,
         /** Serialization exception occurred during serialization of the 
value. */
         VALUE
     }
+
+    /**
+     * Represents the result of handling a production exception.
+     * <p>
+     * The {@code Response} class encapsulates a {@link Result},
+     * indicating whether processing should continue or fail, along with an 
optional list of
+     * {@link ProducerRecord} instances to be sent to a dead letter queue.
+     * </p>
+     */
+    class Response {
+
+        private final Result result;
+
+        private final List<ProducerRecord<byte[], byte[]>> 
deadLetterQueueRecords;
+
+        /**
+         * Constructs a new {@code Response} object.
+         *
+         * @param result the result indicating whether processing should 
continue or fail;
+         *                                  must not be {@code null}.
+         * @param deadLetterQueueRecords the list of records to be sent to the 
dead letter queue; may be {@code null}.
+         */
+        private Response(final Result result,
+                         final List<ProducerRecord<byte[], byte[]>> 
deadLetterQueueRecords) {
+            this.result = result;
+            this.deadLetterQueueRecords = deadLetterQueueRecords;
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should fail.
+         *
+         * @param deadLetterQueueRecords the list of records to be sent to the 
dead letter queue; may be {@code null}.
+         * @return a {@code Response} with a {@link 
ProductionExceptionHandler.Result#FAIL} status.
+         */
+        public static Response fail(final List<ProducerRecord<byte[], byte[]>> 
deadLetterQueueRecords) {
+            return new Response(Result.FAIL, deadLetterQueueRecords);
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should fail.
+         *
+         * @return a {@code Response} with a {@link 
ProductionExceptionHandler.Result#FAIL} status.
+         */
+        public static Response fail() {
+            return fail(Collections.emptyList());
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should 
continue.
+         *
+         * @param deadLetterQueueRecords the list of records to be sent to the 
dead letter queue; may be {@code null}.
+         * @return a {@code Response} with a {@link 
ProductionExceptionHandler.Result#RESUME} status.
+         */
+        public static Response resume(final List<ProducerRecord<byte[], 
byte[]>> deadLetterQueueRecords) {
+            return new Response(Result.RESUME, deadLetterQueueRecords);
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should 
continue.
+         *
+         * @return a {@code Response} with a {@link 
ProductionExceptionHandler.Result#RESUME} status.
+         */
+        public static Response resume() {
+            return resume(Collections.emptyList());
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should retry.
+         *
+         * @return a {@code Response} with a {@link 
ProductionExceptionHandler.Result#RETRY} status.
+         */
+        public static Response retry() {
+            return new Response(Result.RETRY, Collections.emptyList());
+        }
+
+        /**
+         * Retrieves the production exception handler result.
+         *
+         * @return the {@link Result} indicating whether processing should 
continue, fail or retry.
+         */
+        public Result result() {
+            return result;
+        }
+
+        /**
+         * Retrieves an unmodifiable list of records to be sent to the dead 
letter queue.
+         * <p>
+         * If the list is {@code null}, an empty list is returned.
+         * </p>
+         *
+         * @return an unmodifiable list of {@link ProducerRecord} instances
+         *         for the dead letter queue, or an empty list if no records 
are available.
+         */
+        public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
+            if (deadLetterQueueRecords == null) {
+                return Collections.emptyList();
+            }
+            return Collections.unmodifiableList(deadLetterQueueRecords);
+        }
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
new file mode 100644
index 00000000000..d3fd221cea8
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.errors.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ErrorHandlerContext;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@code ExceptionHandlerUtils} Contains utilities method that could be used 
by all exception handlers
+ */
+public class ExceptionHandlerUtils {
+    public static final String HEADER_ERRORS_EXCEPTION_NAME = 
"__streams.errors.exception";
+    public static final String HEADER_ERRORS_STACKTRACE_NAME = 
"__streams.errors.stacktrace";
+    public static final String HEADER_ERRORS_EXCEPTION_MESSAGE_NAME = 
"__streams.errors.message";
+    public static final String HEADER_ERRORS_TOPIC_NAME = 
"__streams.errors.topic";
+    public static final String HEADER_ERRORS_PARTITION_NAME = 
"__streams.errors.partition";
+    public static final String HEADER_ERRORS_OFFSET_NAME = 
"__streams.errors.offset";
+
+
+    public static boolean shouldBuildDeadLetterQueueRecord(final String 
deadLetterQueueTopicName) {
+        return deadLetterQueueTopicName != null;
+    }
+
+    /**
+     * If required, return Dead Letter Queue records for the provided exception
+     *
+     * @param key Serialized key for the records
+     * @param value Serialized value for the records
+     * @param context ErrorHandlerContext of the exception
+     * @param exception Thrown exception
+     * @return A list of Dead Letter Queue records to produce
+     */
+    public static List<ProducerRecord<byte[], byte[]>> 
maybeBuildDeadLetterQueueRecords(final String deadLetterQueueTopicName,
+                                                                               
  final byte[] key,
+                                                                               
  final byte[] value,
+                                                                               
  final ErrorHandlerContext context,
+                                                                               
  final Exception exception) {
+        if (!shouldBuildDeadLetterQueueRecord(deadLetterQueueTopicName)) {
+            return Collections.emptyList();
+        }
+
+        return 
Collections.singletonList(buildDeadLetterQueueRecord(deadLetterQueueTopicName, 
key, value, context, exception));
+    }
+
+
+    /**
+     * Build dead letter queue record for the provided exception.
+     *
+     * @param key Serialized key for the record.
+     * @param value Serialized value for the record.
+     * @param context error handler context of the exception.
+     * @return A dead letter queue record to produce.
+     */
+    public static ProducerRecord<byte[], byte[]> 
buildDeadLetterQueueRecord(final String deadLetterQueueTopicName,
+                                                                     final 
byte[] key,
+                                                                     final 
byte[] value,
+                                                                     final 
ErrorHandlerContext context,
+                                                                     final 
Exception e) {
+        if (deadLetterQueueTopicName == null) {
+            throw new InvalidConfigurationException(String.format("%s cannot 
be null while building dead letter queue record", 
StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
+        }
+        final ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(deadLetterQueueTopicName, null, context.timestamp(), key, 
value);
+        final StringWriter stackTraceStringWriter = new StringWriter();
+        final PrintWriter stackTracePrintWriter = new 
PrintWriter(stackTraceStringWriter);
+        e.printStackTrace(stackTracePrintWriter);
+
+        try (final StringSerializer stringSerializer = new StringSerializer()) 
{
+            producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME, 
stringSerializer.serialize(null, e.toString()));
+            producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME, 
stringSerializer.serialize(null, e.getMessage()));
+            producerRecord.headers().add(HEADER_ERRORS_STACKTRACE_NAME, 
stringSerializer.serialize(null, stackTraceStringWriter.toString()));
+            producerRecord.headers().add(HEADER_ERRORS_TOPIC_NAME, 
stringSerializer.serialize(null, context.topic()));
+            producerRecord.headers().add(HEADER_ERRORS_PARTITION_NAME, 
stringSerializer.serialize(null, String.valueOf(context.partition())));
+            producerRecord.headers().add(HEADER_ERRORS_OFFSET_NAME, 
stringSerializer.serialize(null, String.valueOf(context.offset())));
+        }
+
+        return producerRecord;
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 1dddc55ca3c..bbf82ff9033 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.errors.ErrorHandlerContext;
 import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
@@ -220,11 +221,11 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
                 internalProcessorContext.recordContext().sourceRawValue()
             );
 
-            final ProcessingExceptionHandler.ProcessingHandlerResponse 
response;
+            final ProcessingExceptionHandler.Response response;
             try {
                 response = Objects.requireNonNull(
-                    processingExceptionHandler.handle(errorHandlerContext, 
record, processingException),
-                    "Invalid ProductionExceptionHandler response."
+                    
processingExceptionHandler.handleError(errorHandlerContext, record, 
processingException),
+                    "Invalid ProcessingExceptionHandler response."
                 );
             } catch (final Exception fatalUserException) {
                 // while Java distinguishes checked vs unchecked exceptions, 
other languages
@@ -242,7 +243,21 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
                 );
             }
 
-            if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
+            final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords 
= response.deadLetterQueueRecords();
+            if (!deadLetterQueueRecords.isEmpty()) {
+                final RecordCollector collector = ((RecordCollector.Supplier) 
internalProcessorContext).recordCollector();
+                for (final ProducerRecord<byte[], byte[]> 
deadLetterQueueRecord : deadLetterQueueRecords) {
+                    collector.send(
+                            deadLetterQueueRecord.key(),
+                            deadLetterQueueRecord.value(),
+                            name(),
+                            internalProcessorContext,
+                            deadLetterQueueRecord
+                    );
+                }
+            }
+
+            if (response.result() == ProcessingExceptionHandler.Result.FAIL) {
                 log.error("Processing exception handler is set to fail upon" +
                      " a processing error. If you would rather have the 
streaming pipeline" +
                      " continue after a processing error, please set the " +
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index a48a671d460..2c14200f413 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
@@ -48,6 +49,12 @@ public interface RecordCollector {
                      final InternalProcessorContext<Void, Void> context,
                      final StreamPartitioner<? super K, ? super V> 
partitioner);
 
+    <K, V> void send(K key,
+                     V value,
+                     String processorNodeId,
+                     InternalProcessorContext<?, ?> context,
+                     ProducerRecord<byte[], byte[]> serializedRecord);
+
     /**
      * Initialize the internal {@link Producer}; note this function should be 
made idempotent
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 89cbf4d4c7d..c4178733ef3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -41,7 +41,6 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.ProductionExceptionHandler;
-import 
org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
@@ -263,6 +262,15 @@ public class RecordCollectorImpl implements 
RecordCollector {
         // freeing raw records in the context to reduce memory pressure
         freeRawInputRecordFromContext(context);
 
+        send(key, value, processorNodeId, context, serializedRecord);
+    }
+
+    public <K, V> void send(final K key,
+                            final V value,
+                            final String processorNodeId,
+                            final InternalProcessorContext<?, ?> context,
+                            final ProducerRecord<byte[], byte[]> 
serializedRecord) {
+
         streamsProducer.send(serializedRecord, (metadata, exception) -> {
             try {
                 // if there's already an exception record, skip logging 
offsets or new exceptions
@@ -278,16 +286,16 @@ public class RecordCollectorImpl implements 
RecordCollector {
                         log.warn("Received offset={} in produce response for 
{}", metadata.offset(), tp);
                     }
 
-                    if (!topic.endsWith("-changelog")) {
+                    if (!serializedRecord.topic().endsWith("-changelog")) {
                         // we may not have created a sensor during 
initialization if the node uses dynamic topic routing,
                         // as all topics are not known up front, so create the 
sensor for this topic if absent
                         final Sensor topicProducedSensor = 
producedSensorByTopic.computeIfAbsent(
-                            topic,
+                            serializedRecord.topic(),
                             t -> TopicMetrics.producedSensor(
                                 Thread.currentThread().getName(),
                                 taskId.toString(),
                                 processorNodeId,
-                                topic,
+                                serializedRecord.topic(),
                                 context.metrics()
                             )
                         );
@@ -299,7 +307,7 @@ public class RecordCollectorImpl implements RecordCollector 
{
                     }
                 } else {
                     recordSendError(
-                        topic,
+                        serializedRecord.topic(),
                         exception,
                         serializedRecord,
                         context,
@@ -307,7 +315,7 @@ public class RecordCollectorImpl implements RecordCollector 
{
                     );
 
                     // KAFKA-7510 only put message key and value in TRACE 
level log so we don't leak data by default
-                    log.trace("Failed record: (key {} value {} timestamp {}) 
topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
+                    log.trace("Failed record: (key {} value {} timestamp {}) 
topic=[{}] partition=[{}]", key, value, serializedRecord.timestamp(), 
serializedRecord.topic(), serializedRecord.partition());
                 }
             } catch (final RuntimeException fatal) {
                 sendException.set(new StreamsException("Producer.send 
`Callback` failed", fatal));
@@ -329,16 +337,16 @@ public class RecordCollectorImpl implements 
RecordCollector {
                                         final Integer partition,
                                         final Long timestamp,
                                         final String processorNodeId,
-                                        final InternalProcessorContext<Void, 
Void> context,
+                                        final InternalProcessorContext<?, ?> 
context,
                                         final Exception 
serializationException) {
         log.debug(String.format("Error serializing record for topic %s", 
topic), serializationException);
 
         final ProducerRecord<K, V> record = new ProducerRecord<>(topic, 
partition, timestamp, key, value, headers);
 
-        final ProductionExceptionHandlerResponse response;
+        final ProductionExceptionHandler.Response response;
         try {
             response = Objects.requireNonNull(
-                productionExceptionHandler.handleSerializationException(
+                productionExceptionHandler.handleSerializationError(
                     errorHandlerContext(context, processorNodeId),
                     record,
                     serializationException,
@@ -365,7 +373,20 @@ public class RecordCollectorImpl implements 
RecordCollector {
             );
         }
 
-        if (maybeFailResponse(response) == 
ProductionExceptionHandlerResponse.FAIL) {
+        final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords = 
response.deadLetterQueueRecords();
+        if (!deadLetterQueueRecords.isEmpty()) {
+            for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord : 
deadLetterQueueRecords) {
+                this.send(
+                        deadLetterQueueRecord.key(),
+                        deadLetterQueueRecord.value(),
+                        processorNodeId,
+                        context,
+                        deadLetterQueueRecord
+                );
+            }
+        }
+
+        if (maybeFailResponse(response.result()) == 
ProductionExceptionHandler.Result.FAIL) {
             throw new StreamsException(
                 String.format(
                     "Unable to serialize record. ProducerRecord(topic=[%s], 
partition=[%d], timestamp=[%d]",
@@ -385,7 +406,7 @@ public class RecordCollectorImpl implements RecordCollector 
{
         droppedRecordsSensor.record();
     }
 
-    private DefaultErrorHandlerContext errorHandlerContext(final 
InternalProcessorContext<Void, Void> context,
+    private DefaultErrorHandlerContext errorHandlerContext(final 
InternalProcessorContext<?, ?> context,
                                                            final String 
processorNodeId) {
         final RecordContext recordContext = context != null ? 
context.recordContext() : null;
 
@@ -442,7 +463,7 @@ public class RecordCollectorImpl implements RecordCollector 
{
     private void recordSendError(final String topic,
                                  final Exception productionException,
                                  final ProducerRecord<byte[], byte[]> 
serializedRecord,
-                                 final InternalProcessorContext<Void, Void> 
context,
+                                 final InternalProcessorContext<?, ?> context,
                                  final String processorNodeId) {
         String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, 
taskId, productionException.toString());
 
@@ -462,10 +483,10 @@ public class RecordCollectorImpl implements 
RecordCollector {
             // TransactionAbortedException is only thrown after 
`abortTransaction()` was called,
             // so it's only a followup error, and Kafka Streams is already 
handling the original error
         } else {
-            final ProductionExceptionHandlerResponse response;
+            final ProductionExceptionHandler.Response response;
             try {
                 response = Objects.requireNonNull(
-                    productionExceptionHandler.handle(
+                    productionExceptionHandler.handleError(
                         errorHandlerContext(context, processorNodeId),
                         serializedRecord,
                         productionException
@@ -490,14 +511,27 @@ public class RecordCollectorImpl implements 
RecordCollector {
                 return;
             }
 
-            if (productionException instanceof RetriableException && response 
== ProductionExceptionHandlerResponse.RETRY) {
+            final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords 
= response.deadLetterQueueRecords();
+            if (!deadLetterQueueRecords.isEmpty()) {
+                for (final ProducerRecord<byte[], byte[]> 
deadLetterQueueRecord : deadLetterQueueRecords) {
+                    this.send(
+                            deadLetterQueueRecord.key(),
+                            deadLetterQueueRecord.value(),
+                            processorNodeId,
+                            context,
+                            deadLetterQueueRecord
+                    );
+                }
+            }
+
+            if (productionException instanceof RetriableException && 
response.result() == ProductionExceptionHandler.Result.RETRY) {
                 errorMessage += "\nThe broker is either slow or in bad state 
(like not having enough replicas) in responding the request, " +
                     "or the connection to broker was interrupted sending the 
request or receiving the response. " +
                     "\nConsider overwriting `max.block.ms` and /or " +
                     "`delivery.timeout.ms` to a larger value to wait longer 
for such scenarios and avoid timeout errors";
                 sendException.set(new 
TaskCorruptedException(Collections.singleton(taskId)));
             } else {
-                if (maybeFailResponse(response) == 
ProductionExceptionHandlerResponse.FAIL) {
+                if (maybeFailResponse(response.result()) == 
ProductionExceptionHandler.Result.FAIL) {
                     errorMessage += "\nException handler choose to FAIL the 
processing, no more records would be sent.";
                     sendException.set(new StreamsException(errorMessage, 
productionException));
                 } else {
@@ -510,12 +544,12 @@ public class RecordCollectorImpl implements 
RecordCollector {
         log.error(errorMessage, productionException);
     }
 
-    private ProductionExceptionHandlerResponse maybeFailResponse(final 
ProductionExceptionHandlerResponse response) {
-        if (response == ProductionExceptionHandlerResponse.RETRY) {
+    private ProductionExceptionHandler.Result maybeFailResponse(final 
ProductionExceptionHandler.Result result) {
+        if (result == ProductionExceptionHandler.Result.RETRY) {
             log.warn("ProductionExceptionHandler returned RETRY for a 
non-retriable exception. Will treat it as FAIL.");
-            return ProductionExceptionHandlerResponse.FAIL;
+            return ProductionExceptionHandler.Result.FAIL;
         } else {
-            return response;
+            return result;
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 153ca2e02f1..88e756d785a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -17,17 +17,18 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
-import 
org.apache.kafka.streams.errors.DeserializationExceptionHandler.DeserializationHandlerResponse;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 
 import org.slf4j.Logger;
 
+import java.util.List;
 import java.util.Objects;
 
 import static 
org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
@@ -50,7 +51,7 @@ public class RecordDeserializer {
 
     /**
      * @throws StreamsException if a deserialization error occurs and the 
deserialization callback returns
-     *                          {@link DeserializationHandlerResponse#FAIL 
FAIL}
+     *                          {@link 
DeserializationExceptionHandler.Result#FAIL FAIL}
      *                          or throws an exception itself
      */
     ConsumerRecord<Object, Object> deserialize(final ProcessorContext<?, ?> 
processorContext,
@@ -100,11 +101,11 @@ public class RecordDeserializer {
             rawRecord.value()
         );
 
-        final DeserializationHandlerResponse response;
+        final DeserializationExceptionHandler.Response response;
         try {
             response = Objects.requireNonNull(
-                deserializationExceptionHandler.handle(errorHandlerContext, 
rawRecord, deserializationException),
-                "Invalid DeserializationExceptionHandler response."
+                
deserializationExceptionHandler.handleError(errorHandlerContext, rawRecord, 
deserializationException),
+                "Invalid DeserializationExceptionResponse response."
             );
         } catch (final Exception fatalUserException) {
             // while Java distinguishes checked vs unchecked exceptions, other 
languages
@@ -118,7 +119,21 @@ public class RecordDeserializer {
             throw new StreamsException("Fatal user code error in 
deserialization error callback", fatalUserException);
         }
 
-        if (response == DeserializationHandlerResponse.FAIL) {
+        final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords = 
response.deadLetterQueueRecords();
+        if (!deadLetterQueueRecords.isEmpty()) {
+            final RecordCollector collector = ((RecordCollector.Supplier) 
processorContext).recordCollector();
+            for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord : 
deadLetterQueueRecords) {
+                collector.send(
+                        deadLetterQueueRecord.key(),
+                        deadLetterQueueRecord.value(),
+                        sourceNodeName,
+                        (InternalProcessorContext) processorContext,
+                        deadLetterQueueRecord
+                );
+            }
+        }
+
+        if (response.result() == DeserializationExceptionHandler.Result.FAIL) {
             throw new StreamsException("Deserialization exception handler is 
set to fail upon" +
                 " a deserialization error. If you would rather have the 
streaming pipeline" +
                 " continue after a deserialization error, please set the " +
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 82e9c8d7fb1..42b57e46aa4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -945,10 +946,10 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 recordContext.sourceRawValue()
             );
 
-            final ProcessingExceptionHandler.ProcessingHandlerResponse 
response;
+            final ProcessingExceptionHandler.Response 
processingExceptionResponse;
             try {
-                response = Objects.requireNonNull(
-                    processingExceptionHandler.handle(errorHandlerContext, 
null, processingException),
+                processingExceptionResponse = Objects.requireNonNull(
+                    
processingExceptionHandler.handleError(errorHandlerContext, null, 
processingException),
                     "Invalid ProcessingExceptionHandler response."
                 );
             } catch (final Exception fatalUserException) {
@@ -963,7 +964,20 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 throw new FailedProcessingException("Fatal user code error in 
processing error callback", node.name(), fatalUserException);
             }
 
-            if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
+            final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords 
= processingExceptionResponse.deadLetterQueueRecords();
+            if (!deadLetterQueueRecords.isEmpty()) {
+                final RecordCollector collector = ((RecordCollector.Supplier) 
processorContext).recordCollector();
+                for (final ProducerRecord<byte[], byte[]> 
deadLetterQueueRecord : deadLetterQueueRecords) {
+                    collector.send(
+                            deadLetterQueueRecord.key(),
+                            deadLetterQueueRecord.value(),
+                            node.name(),
+                            processorContext,
+                            deadLetterQueueRecord);
+                }
+            }
+
+            if (processingExceptionResponse.result() == 
ProcessingExceptionHandler.Result.FAIL) {
                 log.error("Processing exception handler is set to fail upon" +
                         " a processing error. If you would rather have the 
streaming pipeline" +
                         " continue after a processing error, please set the " +
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 6505cde08ed..bd8002782d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1683,6 +1683,11 @@ public class StreamsConfigTest {
             "Please set group.protocol=classic or remove group.instance.id 
from the configuration."));
     }
 
+    public void shouldSetDefaultDeadLetterQueue() {
+        final StreamsConfig config = new StreamsConfig(props);
+        
assertNull(config.getString(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
+    }
+
     static class MisconfiguredSerde implements Serde<Object> {
         @Override
         public void configure(final Map<String, ?>  configs, final boolean 
isKey) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
new file mode 100644
index 00000000000..915f3a3f650
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
+import org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@ExtendWith(MockitoExtension.class)
+public class ExceptionHandlerUtilsTest {
+    @Test
+    public void checkDeadLetterQueueRecords() {
+        final StringSerializer stringSerializer = new StringSerializer();
+        final StringDeserializer stringDeserializer = new StringDeserializer();
+        final MockRecordCollector collector = new MockRecordCollector();
+        final String key = "key";
+        final String value = "value";
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = new InternalMockProcessorContext<>(
+                new StateSerdes<>("sink", Serdes.ByteArray(), 
Serdes.ByteArray()),
+                collector
+        );
+        internalProcessorContext.setRecordContext(new ProcessorRecordContext(
+                1L,
+                2,
+                3,
+                "source",
+                new RecordHeaders(Collections.singletonList(
+                        new RecordHeader("sourceHeader", 
stringSerializer.serialize(null, "hello world")))),
+                key.getBytes(),
+                value.getBytes()
+        ));
+        final ErrorHandlerContext errorHandlerContext = 
getErrorHandlerContext(internalProcessorContext);
+
+        final NullPointerException exception = new 
NullPointerException("Oopsie!");
+        final Iterable<ProducerRecord<byte[], byte[]>> dlqRecords = 
ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords("dlq", 
errorHandlerContext.sourceRawKey(), errorHandlerContext.sourceRawValue(), 
errorHandlerContext, exception);
+        final Iterator<ProducerRecord<byte[], byte[]>> iterator = 
dlqRecords.iterator();
+
+        assertTrue(iterator.hasNext());
+        final ProducerRecord<byte[], byte[]> dlqRecord = iterator.next();
+        final Headers headers = dlqRecord.headers();
+        assertFalse(iterator.hasNext()); // There should be only one record
+
+        assertEquals("dlq", dlqRecord.topic());
+        assertEquals(errorHandlerContext.timestamp(), dlqRecord.timestamp());
+        assertEquals(1, dlqRecord.timestamp());
+        assertEquals(key, new String(dlqRecord.key()));
+        assertEquals(value, new String(dlqRecord.value()));
+        assertEquals(exception.toString(), 
stringDeserializer.deserialize(null, 
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_NAME).value()));
+        assertEquals(exception.getMessage(), 
stringDeserializer.deserialize(null, 
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
+        assertEquals("source", stringDeserializer.deserialize(null, 
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_TOPIC_NAME).value()));
+        assertEquals("3", stringDeserializer.deserialize(null, 
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_PARTITION_NAME).value()));
+        assertEquals("2", stringDeserializer.deserialize(null, 
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_OFFSET_NAME).value()));
+    }
+
+    @Test
+    public void doNotBuildDeadLetterQueueRecordsIfNotConfigured() {
+        final NullPointerException exception = new 
NullPointerException("Oopsie!");
+        final Iterable<ProducerRecord<byte[], byte[]>> dlqRecords = 
ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords(null, null, null, null, 
exception);
+        final Iterator<ProducerRecord<byte[], byte[]>> iterator = 
dlqRecords.iterator();
+
+        assertFalse(iterator.hasNext());
+    }
+
+    private static DefaultErrorHandlerContext getErrorHandlerContext(final 
InternalProcessorContext<Object, Object> internalProcessorContext) {
+        return new DefaultErrorHandlerContext(
+                null,
+                internalProcessorContext.topic(),
+                internalProcessorContext.partition(),
+                internalProcessorContext.offset(),
+                internalProcessorContext.headers(),
+                internalProcessorContext.currentNode().name(),
+                internalProcessorContext.taskId(),
+                internalProcessorContext.timestamp(),
+                internalProcessorContext.recordContext().sourceRawKey(),
+                internalProcessorContext.recordContext().sourceRawValue());
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index e9669ac39f4..6b7c06580c0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
@@ -29,6 +30,8 @@ import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.ErrorHandlerContext;
+import 
org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
 import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
@@ -39,7 +42,9 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
 import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.jupiter.api.Test;
@@ -52,10 +57,13 @@ import org.mockito.quality.Strictness;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import static 
org.apache.kafka.streams.errors.ProcessingExceptionHandler.Response;
+import static 
org.apache.kafka.streams.errors.ProcessingExceptionHandler.Result;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -103,7 +111,7 @@ public class ProcessorNodeTest {
             new ProcessorNode<>(NAME, new 
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
 
         final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
-        node.init(internalProcessorContext, new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL,
 internalProcessorContext, false));
+        node.init(internalProcessorContext, new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.Response.fail(), 
internalProcessorContext, false));
 
         final FailedProcessingException failedProcessingException = 
assertThrows(FailedProcessingException.class,
             () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
@@ -120,7 +128,7 @@ public class ProcessorNodeTest {
             new ProcessorNode<>(NAME, new 
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
 
         final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
-        node.init(internalProcessorContext, new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE,
 internalProcessorContext, false));
+        node.init(internalProcessorContext, new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.Response.resume(), 
internalProcessorContext, false));
 
         assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, 
TIMESTAMP)));
     }
@@ -147,7 +155,7 @@ public class ProcessorNodeTest {
 
         assertEquals(ignoredExceptionCause, 
runtimeException.getCause().getClass());
         assertEquals(ignoredExceptionCauseMessage, 
runtimeException.getCause().getMessage());
-        verify(processingExceptionHandler, never()).handle(any(), any(), 
any());
+        verify(processingExceptionHandler, never()).handleError(any(), any(), 
any());
     }
 
     @Test
@@ -156,7 +164,7 @@ public class ProcessorNodeTest {
                 new ProcessorNode<>(NAME, new 
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
 
         final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
-        node.init(internalProcessorContext, new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE,
 internalProcessorContext, true));
+        node.init(internalProcessorContext, new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.Response.resume(), 
internalProcessorContext, true));
 
         final FailedProcessingException failedProcessingException = 
assertThrows(FailedProcessingException.class,
             () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
@@ -166,6 +174,58 @@ public class ProcessorNodeTest {
         assertEquals(NAME, 
failedProcessingException.failedProcessorNodeName());
     }
 
+
+    @Test
+    public void 
shouldBuildDeadLetterQueueRecordsInDefaultProcessingExceptionHandler() {
+        final ProcessorNode<Object, Object, Object, Object> node = new 
ProcessorNode<>("processor",
+                (Processor<Object, Object, Object, Object>) record -> {
+                    throw new NullPointerException("Oopsie!");
+                }, Collections.emptySet());
+
+        final MockRecordCollector collector = new MockRecordCollector();
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext =
+                new InternalMockProcessorContext<>(
+                        new StateSerdes<>("sink", Serdes.ByteArray(), 
Serdes.ByteArray()),
+                        collector
+                );
+        final ProcessingExceptionHandler processingExceptionHandler = new 
LogAndFailProcessingExceptionHandler();
+        
processingExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
 "dlq"));
+        node.init(internalProcessorContext, processingExceptionHandler);
+
+        assertThrows(RuntimeException.class,
+                () -> node.process(new Record<>("hello", "world", 1L)));
+
+        assertEquals(1, collector.collected().size());
+        assertEquals("dlq", collector.collected().get(0).topic());
+        assertEquals("sourceKey", new String((byte[]) 
collector.collected().get(0).key()));
+        assertEquals("sourceValue", new String((byte[]) 
collector.collected().get(0).value()));
+    }
+
+    @Test
+    public void 
shouldBuildDeadLetterQueueRecordsInLogAndContinueProcessingExceptionHandler() {
+        final ProcessorNode<Object, Object, Object, Object> node = new 
ProcessorNode<>("processor",
+                (Processor<Object, Object, Object, Object>) record -> {
+                    throw new NullPointerException("Oopsie!");
+                }, Collections.emptySet());
+
+        final MockRecordCollector collector = new MockRecordCollector();
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext =
+                new InternalMockProcessorContext<>(
+                        new StateSerdes<>("sink", Serdes.ByteArray(), 
Serdes.ByteArray()),
+                        collector
+                );
+        final ProcessingExceptionHandler processingExceptionHandler = new 
LogAndContinueProcessingExceptionHandler();
+        
processingExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
 "dlq"));
+        node.init(internalProcessorContext, processingExceptionHandler);
+
+        node.process(new Record<>("hello", "world", 0L));
+
+        assertEquals(1, collector.collected().size());
+        assertEquals("dlq", collector.collected().get(0).topic());
+        assertEquals("sourceKey", new String((byte[]) 
collector.collected().get(0).key()));
+        assertEquals("sourceValue", new String((byte[]) 
collector.collected().get(0).value()));
+    }
+
     private static class ExceptionalProcessor implements Processor<Object, 
Object, Object, Object> {
         @Override
         public void init(final ProcessorContext<Object, Object> context) {
@@ -318,6 +378,64 @@ public class ProcessorNodeTest {
         assertTrue(se.getMessage().contains("pname"));
     }
 
+    @Test
+    void shouldFailWithDeadLetterQueueRecords() {
+        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+        final List<ProducerRecord<byte[], byte[]>> records = 
Collections.singletonList(record);
+
+        final Response response = Response.fail(records);
+
+        assertEquals(Result.FAIL, response.result());
+        assertEquals(1, response.deadLetterQueueRecords().size());
+        assertEquals(record, response.deadLetterQueueRecords().get(0));
+    }
+
+    @Test
+    void shouldFailWithoutDeadLetterQueueRecords() {
+        final Response response = Response.fail();
+
+        assertEquals(Result.FAIL, response.result());
+        assertTrue(response.deadLetterQueueRecords().isEmpty());
+    }
+
+    @Test
+    void shouldResumeWithDeadLetterQueueRecords() {
+        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+        final List<ProducerRecord<byte[], byte[]>> records = 
Collections.singletonList(record);
+
+        final Response response = Response.resume(records);
+
+        assertEquals(Result.RESUME, response.result());
+        assertEquals(1, response.deadLetterQueueRecords().size());
+        assertEquals(record, response.deadLetterQueueRecords().get(0));
+    }
+
+    @Test
+    void shouldResumeWithoutDeadLetterQueueRecords() {
+        final Response response = Response.resume();
+
+        assertEquals(Result.RESUME, response.result());
+        assertTrue(response.deadLetterQueueRecords().isEmpty());
+    }
+
+
+    @Test
+    void shouldNotBeModifiable() {
+        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+        final List<ProducerRecord<byte[], byte[]>> records = 
Collections.singletonList(record);
+
+        final Response response = Response.fail(records);
+
+        assertThrows(UnsupportedOperationException.class, () -> 
response.deadLetterQueueRecords().add(record));
+    }
+
+    @Test
+    void shouldReturnsEmptyList() {
+        final Response response = Response.fail();
+
+        assertTrue(response.deadLetterQueueRecords().isEmpty());
+    }
+
     @SuppressWarnings("unchecked")
     private InternalProcessorContext<Object, Object> 
mockInternalProcessorContext() {
         final InternalProcessorContext<Object, Object> 
internalProcessorContext = mock(InternalProcessorContext.class, 
withSettings().strictness(Strictness.LENIENT));
@@ -342,12 +460,12 @@ public class ProcessorNodeTest {
     }
 
     public static class ProcessingExceptionHandlerMock implements 
ProcessingExceptionHandler {
-        private final ProcessingExceptionHandler.ProcessingHandlerResponse 
response;
+        private final Response response;
         private final InternalProcessorContext<Object, Object> 
internalProcessorContext;
 
         private final boolean shouldThrowException;
 
-        public ProcessingExceptionHandlerMock(final 
ProcessingExceptionHandler.ProcessingHandlerResponse response,
+        public ProcessingExceptionHandlerMock(final Response response,
                                               final 
InternalProcessorContext<Object, Object> internalProcessorContext,
                                               final boolean 
shouldThrowException) {
             this.response = response;
@@ -356,7 +474,7 @@ public class ProcessorNodeTest {
         }
 
         @Override
-        public ProcessingExceptionHandler.ProcessingHandlerResponse 
handle(final ErrorHandlerContext context, final Record<?, ?> record, final 
Exception exception) {
+        public Response handleError(final ErrorHandlerContext context, final 
Record<?, ?> record, final Exception exception) {
             assertEquals(internalProcessorContext.topic(), context.topic());
             assertEquals(internalProcessorContext.partition(), 
context.partition());
             assertEquals(internalProcessorContext.offset(), context.offset());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index ebc92c65184..64564278a18 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -47,10 +47,10 @@ import 
org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.ErrorHandlerContext;
 import org.apache.kafka.streams.errors.ProductionExceptionHandler;
-import 
org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
 import 
org.apache.kafka.streams.errors.ProductionExceptionHandler.SerializationExceptionOrigin;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
@@ -89,6 +89,8 @@ import static java.util.Collections.emptySet;
 import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.streams.errors.ProductionExceptionHandler.Response;
+import static 
org.apache.kafka.streams.errors.ProductionExceptionHandler.Result;
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.producerRecordSizeInBytes;
@@ -1201,7 +1203,7 @@ public class RecordCollectorTest {
             logContext,
             taskId,
             getExceptionalStreamsProducerOnSend(exception),
-            new 
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)),
+            new 
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume())),
             streamsMetrics,
             topology
         );
@@ -1228,7 +1230,7 @@ public class RecordCollectorTest {
             logContext,
             taskId,
             getExceptionalStreamsProducerOnSend(exception),
-            new 
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)),
+            new 
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume())),
             streamsMetrics,
             topology
         );
@@ -1252,7 +1254,7 @@ public class RecordCollectorTest {
             logContext,
             taskId,
             getExceptionalStreamsProducerOnSend(exception),
-            new 
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)),
+            new 
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume())),
             streamsMetrics,
             topology
         );
@@ -1276,7 +1278,7 @@ public class RecordCollectorTest {
             taskId,
             getExceptionalStreamsProducerOnSend(new 
RuntimeException("KABOOM!")),
             new ProductionExceptionHandlerMock(
-                Optional.of(ProductionExceptionHandlerResponse.CONTINUE),
+                Optional.of(ProductionExceptionHandler.Response.resume()),
                 context,
                 sinkNodeName,
                 taskId
@@ -1347,7 +1349,7 @@ public class RecordCollectorTest {
             taskId,
             getExceptionalStreamsProducerOnSend(exception),
             new ProductionExceptionHandlerMock(
-                Optional.of(ProductionExceptionHandlerResponse.FAIL),
+                Optional.of(ProductionExceptionHandler.Response.fail()),
                 context,
                 sinkNodeName,
                 taskId
@@ -1377,7 +1379,7 @@ public class RecordCollectorTest {
             taskId,
             getExceptionalStreamsProducerOnSend(exception),
             new ProductionExceptionHandlerMock(
-                Optional.of(ProductionExceptionHandlerResponse.CONTINUE),
+                Optional.of(ProductionExceptionHandler.Response.resume()),
                 context,
                 sinkNodeName,
                 taskId
@@ -1400,7 +1402,7 @@ public class RecordCollectorTest {
                 taskId,
                 getExceptionalStreamsProducerOnSend(exception),
                 new ProductionExceptionHandlerMock(
-                    Optional.of(ProductionExceptionHandlerResponse.RETRY),
+                    Optional.of(ProductionExceptionHandler.Response.retry()),
                     context,
                     sinkNodeName,
                     taskId
@@ -1535,7 +1537,7 @@ public class RecordCollectorTest {
     public void shouldDropRecordExceptionUsingAlwaysContinueExceptionHandler() 
{
         try (final ErrorStringSerializer errorSerializer = new 
ErrorStringSerializer()) {
             final RecordCollector collector = newRecordCollector(new 
ProductionExceptionHandlerMock(
-                Optional.of(ProductionExceptionHandlerResponse.CONTINUE),
+                Optional.of(ProductionExceptionHandler.Response.resume()),
                 context,
                 sinkNodeName,
                 taskId,
@@ -1564,7 +1566,7 @@ public class RecordCollectorTest {
     public void 
shouldThrowStreamsExceptionWhenValueSerializationFailedAndProductionExceptionHandlerRepliesWithFail()
 {
         try (final ErrorStringSerializer errorSerializer = new 
ErrorStringSerializer()) {
             final RecordCollector collector = newRecordCollector(new 
ProductionExceptionHandlerMock(
-                Optional.of(ProductionExceptionHandlerResponse.FAIL),
+                Optional.of(ProductionExceptionHandler.Response.fail()),
                 context,
                 sinkNodeName,
                 taskId,
@@ -1585,7 +1587,7 @@ public class RecordCollectorTest {
     public void 
shouldThrowStreamsExceptionWhenKeySerializationFailedAndProductionExceptionHandlerRepliesWithFail()
 {
         try (final ErrorStringSerializer errorSerializer = new 
ErrorStringSerializer()) {
             final RecordCollector collector = newRecordCollector(new 
ProductionExceptionHandlerMock(
-                Optional.of(ProductionExceptionHandlerResponse.FAIL),
+                Optional.of(ProductionExceptionHandler.Response.fail()),
                 context,
                 sinkNodeName,
                 taskId,
@@ -1795,7 +1797,7 @@ public class RecordCollectorTest {
     public void shouldNotCallProductionExceptionHandlerOnClassCastException() {
         try (final ErrorStringSerializer errorSerializer = new 
ErrorStringSerializer()) {
             final RecordCollector collector = newRecordCollector(
-                new 
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE))
+                new 
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume()))
             );
             collector.initialize();
 
@@ -1834,6 +1836,58 @@ public class RecordCollectorTest {
         collector.flush(); // need to call flush() to check for internal 
exceptions
     }
 
+    @Test
+    public void 
shouldBuildDeadLetterQueueRecordsInDefaultExceptionHandlerDuringDeserialization()
 {
+        try (final ErrorStringSerializer errorSerializer = new 
ErrorStringSerializer()) {
+            final DefaultProductionExceptionHandler productionExceptionHandler 
= new DefaultProductionExceptionHandler();
+            productionExceptionHandler.configure(Collections.singletonMap(
+                StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
+                "dlq"
+            ));
+            final RecordCollector collector = 
newRecordCollector(productionExceptionHandler);
+            collector.initialize();
+
+            assertThat(mockProducer.history().isEmpty(), equalTo(true));
+            assertThrows(
+                StreamsException.class,
+                () ->
+                    collector.send(topic, "hello", "world", null, 0, null, 
errorSerializer, stringSerializer, sinkNodeName, context)
+            );
+
+            assertEquals(1, mockProducer.history().size());
+            assertEquals("dlq", mockProducer.history().get(0).topic());
+        }
+    }
+
+
+    @Test
+    public void shouldBuildDeadLetterQueueRecordsInDefaultExceptionHandler() {
+        final KafkaException exception = new KafkaException("KABOOM!");
+        final StreamsProducer streamProducer = 
getExceptionalStreamsProducerOnSend(exception);
+        final MockProducer<byte[], byte[]> mockProducer = 
(MockProducer<byte[], byte[]>) streamProducer.kafkaProducer();
+        final DefaultProductionExceptionHandler productionExceptionHandler = 
new DefaultProductionExceptionHandler();
+        productionExceptionHandler.configure(Collections.singletonMap(
+            StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
+            "dlq"
+        ));
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            streamProducer,
+            productionExceptionHandler,
+            streamsMetrics,
+            topology
+        );
+
+        collector.initialize();
+
+        collector.send(topic, "hello", "world", null, 0, null, 
stringSerializer, stringSerializer, sinkNodeName, context);
+        assertThrows(StreamsException.class, collector::flush);
+
+        assertEquals(1, mockProducer.history().size());
+        assertEquals("dlq", mockProducer.history().get(0).topic());
+    }
+
     @Test
     public void shouldNotSendIfSendOfOtherTaskFailedInCallback() {
         final TaskId taskId1 = new TaskId(0, 0);
@@ -1954,6 +2008,116 @@ public class RecordCollectorTest {
         }
     }
 
+    public void shouldCallOldImplementationExceptionHandler() {
+        final KafkaException exception = new KafkaException("KABOOM!");
+        final StreamsProducer streamProducer = 
getExceptionalStreamsProducerOnSend(exception);
+        final OldProductionExceptionHandlerImplementation 
productionExceptionHandler = new OldProductionExceptionHandlerImplementation();
+
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            streamProducer,
+            productionExceptionHandler,
+            streamsMetrics,
+            topology
+        );
+
+        collector.initialize();
+
+        collector.send(topic, "hello", "world", null, 0, null, 
stringSerializer, stringSerializer, sinkNodeName, context);
+        final Exception thrown = assertThrows(StreamsException.class, 
collector::flush);
+
+        assertEquals(exception, thrown.getCause());
+    }
+
+    @Test
+    public void shouldCallOldImplementationWithRecordContextExceptionHandler() 
{
+        final KafkaException exception = new KafkaException("KABOOM!");
+        final StreamsProducer streamProducer = 
getExceptionalStreamsProducerOnSend(exception);
+        final OldProductionExceptionHandlerWithRecordContextImplementation 
productionExceptionHandler = new 
OldProductionExceptionHandlerWithRecordContextImplementation();
+
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            streamProducer,
+            productionExceptionHandler,
+            streamsMetrics,
+            topology
+        );
+
+        collector.initialize();
+
+        collector.send(topic, "hello", "world", null, 0, null, 
stringSerializer, stringSerializer, sinkNodeName, context);
+        final Exception thrown = assertThrows(StreamsException.class, 
collector::flush);
+
+        assertEquals(exception, thrown.getCause());
+    }
+
+    @Test
+    void shouldFailWithDeadLetterQueueRecords() {
+        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+        final List<ProducerRecord<byte[], byte[]>> records = 
Collections.singletonList(record);
+
+        final ProductionExceptionHandler.Response response = 
ProductionExceptionHandler.Response.fail(records);
+
+        assertEquals(Result.FAIL, response.result());
+        assertEquals(1, response.deadLetterQueueRecords().size());
+        assertEquals(record, response.deadLetterQueueRecords().get(0));
+    }
+
+    @Test
+    void shouldFailWithoutDeadLetterQueueRecords() {
+        final ProductionExceptionHandler.Response response = 
ProductionExceptionHandler.Response.fail();
+
+        assertEquals(Result.FAIL, response.result());
+        assertTrue(response.deadLetterQueueRecords().isEmpty());
+    }
+
+    @Test
+    void shouldResumeWithDeadLetterQueueRecords() {
+        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+        final List<ProducerRecord<byte[], byte[]>> records = 
Collections.singletonList(record);
+
+        final Response response = Response.resume(records);
+
+        assertEquals(Result.RESUME, response.result());
+        assertEquals(1, response.deadLetterQueueRecords().size());
+        assertEquals(record, response.deadLetterQueueRecords().get(0));
+    }
+
+    @Test
+    void shouldResumeWithoutDeadLetterQueueRecords() {
+        final Response response = Response.resume();
+
+        assertEquals(Result.RESUME, response.result());
+        assertTrue(response.deadLetterQueueRecords().isEmpty());
+    }
+
+    @Test
+    void shouldRetryWithoutDeadLetterQueueRecords() {
+        final Response response = Response.retry();
+
+        assertEquals(Result.RETRY, response.result());
+        assertTrue(response.deadLetterQueueRecords().isEmpty());
+    }
+
+    @Test
+    void shouldNotBeModifiable() {
+        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+        final List<ProducerRecord<byte[], byte[]>> records = 
Collections.singletonList(record);
+
+        final Response response = Response.fail(records);
+
+        assertThrows(UnsupportedOperationException.class, () -> 
response.deadLetterQueueRecords().add(record));
+    }
+
+    @Test
+    void shouldReturnsEmptyList() {
+        final Response response = Response.fail();
+
+        assertTrue(response.deadLetterQueueRecords().isEmpty());
+    }
+
     private RecordCollector newRecordCollector(final 
ProductionExceptionHandler productionExceptionHandler) {
         return new RecordCollectorImpl(
             logContext,
@@ -1978,8 +2142,12 @@ public class RecordCollectorTest {
             new MockProducer<>(cluster, true, null, byteArraySerializer, 
byteArraySerializer) {
                 @Override
                 public synchronized Future<RecordMetadata> send(final 
ProducerRecord<byte[], byte[]> record, final Callback callback) {
-                    callback.onCompletion(null, exception);
-                    return null;
+                    if (record.topic().equals("dlq")) {
+                        return super.send(record, callback);
+                    } else {
+                        callback.onCompletion(null, exception);
+                        return null;
+                    }
                 }
             },
             AT_LEAST_ONCE,
@@ -2023,7 +2191,7 @@ public class RecordCollectorTest {
     }
 
     public static class ProductionExceptionHandlerMock implements 
ProductionExceptionHandler {
-        private final Optional<ProductionExceptionHandlerResponse> response;
+        private final Optional<Response> response;
         private boolean shouldThrowException;
         private InternalProcessorContext<Void, Void> expectedContext;
         private String expectedProcessorNodeId;
@@ -2040,11 +2208,11 @@ public class RecordCollectorTest {
             this.expectedSerializationExceptionOrigin = null;
         }
 
-        public ProductionExceptionHandlerMock(final 
Optional<ProductionExceptionHandlerResponse> response) {
+        public ProductionExceptionHandlerMock(final Optional<Response> 
response) {
             this.response = response;
         }
 
-        public ProductionExceptionHandlerMock(final 
Optional<ProductionExceptionHandlerResponse> response,
+        public ProductionExceptionHandlerMock(final Optional<Response> 
response,
                                               final 
InternalProcessorContext<Void, Void> context,
                                               final String processorNodeId,
                                               final TaskId taskId) {
@@ -2064,7 +2232,7 @@ public class RecordCollectorTest {
             this.shouldThrowException = shouldThrowException;
         }
 
-        public ProductionExceptionHandlerMock(final 
Optional<ProductionExceptionHandlerResponse> response,
+        public ProductionExceptionHandlerMock(final Optional<Response> 
response,
                                               final 
InternalProcessorContext<Void, Void> context,
                                               final String processorNodeId,
                                               final TaskId taskId,
@@ -2075,9 +2243,9 @@ public class RecordCollectorTest {
         }
 
         @Override
-        public ProductionExceptionHandlerResponse handle(final 
ErrorHandlerContext context,
-                                                         final 
ProducerRecord<byte[], byte[]> record,
-                                                         final Exception 
exception) {
+        public Response handleError(final ErrorHandlerContext context,
+                                    final ProducerRecord<byte[], byte[]> 
record,
+                                    final Exception exception) {
             assertInputs(context, exception);
             if (shouldThrowException) {
                 throw new RuntimeException("CRASH");
@@ -2087,10 +2255,10 @@ public class RecordCollectorTest {
 
         @SuppressWarnings("rawtypes")
         @Override
-        public ProductionExceptionHandlerResponse 
handleSerializationException(final ErrorHandlerContext context,
-                                                                               
final ProducerRecord record,
-                                                                               
final Exception exception,
-                                                                               
final SerializationExceptionOrigin origin) {
+        public Response handleSerializationError(final ErrorHandlerContext 
context,
+                                                 final ProducerRecord record,
+                                                 final Exception exception,
+                                                 final 
SerializationExceptionOrigin origin) {
             assertInputs(context, exception);
             assertEquals(expectedSerializationExceptionOrigin, origin);
             if (shouldThrowException) {
@@ -2115,4 +2283,33 @@ public class RecordCollectorTest {
             assertEquals("KABOOM!", exception.getMessage());
         }
     }
+
+    public static class OldProductionExceptionHandlerImplementation implements 
ProductionExceptionHandler {
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public ProductionExceptionHandlerResponse handle(final 
ProducerRecord<byte[], byte[]> record,
+                                                         final Exception 
exception) {
+            return ProductionExceptionHandlerResponse.FAIL;
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs) {
+        }
+    }
+
+    public static class 
OldProductionExceptionHandlerWithRecordContextImplementation implements 
ProductionExceptionHandler {
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public ProductionExceptionHandlerResponse handle(final 
ErrorHandlerContext context,
+                                                         final 
ProducerRecord<byte[], byte[]> record,
+                                                         final Exception 
exception) {
+            return ProductionExceptionHandlerResponse.FAIL;
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs) {
+        }
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index aa3bb57c7a6..db726d78dcc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -17,32 +17,44 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
-import 
org.apache.kafka.streams.errors.DeserializationExceptionHandler.DeserializationHandlerResponse;
 import org.apache.kafka.streams.errors.ErrorHandlerContext;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
 import static 
org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
+import static 
org.apache.kafka.streams.errors.DeserializationExceptionHandler.Response;
+import static 
org.apache.kafka.streams.errors.DeserializationExceptionHandler.Result;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class RecordDeserializerTest {
     private final String sourceNodeName = "source-node";
@@ -108,7 +120,7 @@ public class RecordDeserializerTest {
                             "value"
                     ),
                     new DeserializationExceptionHandlerMock(
-                            Optional.of(DeserializationHandlerResponse.FAIL),
+                            
Optional.of(DeserializationExceptionHandler.Response.fail()),
                             rawRecord,
                             sourceNodeName,
                             taskId
@@ -147,7 +159,7 @@ public class RecordDeserializerTest {
                             "value"
                     ),
                     new DeserializationExceptionHandlerMock(
-                            
Optional.of(DeserializationHandlerResponse.CONTINUE),
+                            
Optional.of(DeserializationExceptionHandler.Response.resume()),
                             rawRecord,
                             sourceNodeName,
                             taskId
@@ -188,7 +200,7 @@ public class RecordDeserializerTest {
             );
             assertEquals("Fatal user code error in deserialization error 
callback", exception.getMessage());
             assertInstanceOf(NullPointerException.class, exception.getCause());
-            assertEquals("Invalid DeserializationExceptionHandler response.", 
exception.getCause().getMessage());
+            assertEquals("Invalid DeserializationExceptionResponse response.", 
exception.getCause().getMessage());
         }
     }
 
@@ -222,6 +234,144 @@ public class RecordDeserializerTest {
         }
     }
 
+
+    @Test
+    public void 
shouldBuildDeadLetterQueueRecordsInDefaultDeserializationException() {
+        try (Metrics metrics = new Metrics()) {
+            final MockRecordCollector collector = new MockRecordCollector();
+            final InternalProcessorContext<Object, Object> 
internalProcessorContext =
+                    new InternalMockProcessorContext<>(
+                            new StateSerdes<>("sink", Serdes.ByteArray(), 
Serdes.ByteArray()),
+                            collector
+                    );
+            final DeserializationExceptionHandler 
deserializationExceptionHandler = new LogAndFailExceptionHandler();
+            
deserializationExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
 "dlq"));
+
+            assertThrows(StreamsException.class, () -> 
RecordDeserializer.handleDeserializationFailure(
+                    deserializationExceptionHandler,
+                    internalProcessorContext,
+                    new RuntimeException(new NullPointerException("Oopsie")),
+                    new ConsumerRecord<>("source",
+                            0,
+                            0,
+                            123,
+                            TimestampType.CREATE_TIME,
+                            -1,
+                            -1,
+                            "hello".getBytes(StandardCharsets.UTF_8),
+                            "world".getBytes(StandardCharsets.UTF_8),
+                            new RecordHeaders(),
+                            Optional.empty()),
+                    new LogContext().logger(this.getClass()),
+                    metrics.sensor("dropped-records"),
+                    "sourceNode"
+            ));
+
+            assertEquals(1, collector.collected().size());
+            assertEquals("dlq", collector.collected().get(0).topic());
+            assertEquals("hello", new String((byte[]) 
collector.collected().get(0).key()));
+            assertEquals("world", new String((byte[]) 
collector.collected().get(0).value()));
+        }
+    }
+
+
+    @Test
+    public void 
shouldBuildDeadLetterQueueRecordsInLogAndContinueDeserializationException() {
+        try (Metrics metrics = new Metrics()) {
+            final MockRecordCollector collector = new MockRecordCollector();
+            final InternalProcessorContext<Object, Object> 
internalProcessorContext =
+                    new InternalMockProcessorContext<>(
+                            new StateSerdes<>("sink", Serdes.ByteArray(), 
Serdes.ByteArray()),
+                            collector
+                    );
+            final DeserializationExceptionHandler 
deserializationExceptionHandler = new LogAndContinueExceptionHandler();
+            
deserializationExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
 "dlq"));
+
+            RecordDeserializer.handleDeserializationFailure(
+                    deserializationExceptionHandler,
+                    internalProcessorContext,
+                    new RuntimeException(new NullPointerException("Oopsie")),
+                    new ConsumerRecord<>("source",
+                            0,
+                            0,
+                            123,
+                            TimestampType.CREATE_TIME,
+                            -1,
+                            -1,
+                            "hello".getBytes(StandardCharsets.UTF_8),
+                            "world".getBytes(StandardCharsets.UTF_8),
+                            new RecordHeaders(),
+                            Optional.empty()),
+                    new LogContext().logger(this.getClass()),
+                    metrics.sensor("dropped-records"),
+                    "sourceNode"
+            );
+
+            assertEquals(1, collector.collected().size());
+            assertEquals("dlq", collector.collected().get(0).topic());
+            assertEquals("hello", new String((byte[]) 
collector.collected().get(0).key()));
+            assertEquals("world", new String((byte[]) 
collector.collected().get(0).value()));
+        }
+    }
+
+    @Test
+    void shouldFailWithDeadLetterQueueRecords() {
+        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+        final List<ProducerRecord<byte[], byte[]>> records = 
Collections.singletonList(record);
+
+        final Response response = Response.fail(records);
+
+        assertEquals(Result.FAIL, response.result());
+        assertEquals(1, response.deadLetterQueueRecords().size());
+        assertEquals(record, response.deadLetterQueueRecords().get(0));
+    }
+
+    @Test
+    void shouldFailWithoutDeadLetterQueueRecords() {
+        final Response response = 
DeserializationExceptionHandler.Response.fail();
+
+        assertEquals(Result.FAIL, response.result());
+        assertTrue(response.deadLetterQueueRecords().isEmpty());
+    }
+
+    @Test
+    void shouldResumeWithDeadLetterQueueRecords() {
+        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+        final List<ProducerRecord<byte[], byte[]>> records = 
Collections.singletonList(record);
+
+        final Response response = Response.resume(records);
+
+        assertEquals(Result.RESUME, response.result());
+        assertEquals(1, response.deadLetterQueueRecords().size());
+        assertEquals(record, response.deadLetterQueueRecords().get(0));
+    }
+
+    @Test
+    void shouldResumeWithoutDeadLetterQueueRecords() {
+        final Response response = Response.resume();
+
+        assertEquals(Result.RESUME, response.result());
+        assertTrue(response.deadLetterQueueRecords().isEmpty());
+    }
+
+
+    @Test
+    void shouldNotBeModifiable() {
+        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+        final List<ProducerRecord<byte[], byte[]>> records = 
Collections.singletonList(record);
+
+        final Response response = Response.fail(records);
+
+        assertThrows(UnsupportedOperationException.class, () -> 
response.deadLetterQueueRecords().add(record));
+    }
+
+    @Test
+    void shouldReturnsEmptyList() {
+        final Response response = Response.fail();
+
+        assertTrue(response.deadLetterQueueRecords().isEmpty());
+    }
+
     static class TheSourceNode extends SourceNode<Object, Object> {
         private final boolean keyThrowsException;
         private final boolean valueThrowsException;
@@ -258,12 +408,12 @@ public class RecordDeserializerTest {
     }
 
     public static class DeserializationExceptionHandlerMock implements 
DeserializationExceptionHandler {
-        private final Optional<DeserializationHandlerResponse> response;
+        private final Optional<Response> response;
         private final ConsumerRecord<byte[], byte[]> expectedRecord;
         private final String expectedProcessorNodeId;
         private final TaskId expectedTaskId;
 
-        public DeserializationExceptionHandlerMock(final 
Optional<DeserializationHandlerResponse> response,
+        public DeserializationExceptionHandlerMock(final Optional<Response> 
response,
                                                    final 
ConsumerRecord<byte[], byte[]> record,
                                                    final String 
processorNodeId,
                                                    final TaskId taskId) {
@@ -274,9 +424,9 @@ public class RecordDeserializerTest {
         }
 
         @Override
-        public DeserializationHandlerResponse handle(final ErrorHandlerContext 
context,
-                                                     final 
ConsumerRecord<byte[], byte[]> record,
-                                                     final Exception 
exception) {
+        public Response handleError(final ErrorHandlerContext context,
+                                    final ConsumerRecord<byte[], byte[]> 
record,
+                                    final Exception exception) {
             assertEquals(expectedRecord.topic(), context.topic());
             assertEquals(expectedRecord.partition(), context.partition());
             assertEquals(expectedRecord.offset(), context.offset());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 701f38eda0c..385719530b9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -3032,7 +3032,7 @@ public class StreamTaskTest {
 
     public static class CrashingProcessingExceptionHandler implements 
ProcessingExceptionHandler {
         @Override
-        public ProcessingHandlerResponse handle(final ErrorHandlerContext 
context, final Record<?, ?> record, final Exception exception) {
+        public Response handleError(final ErrorHandlerContext context, final 
Record<?, ?> record, final Exception exception) {
             throw new RuntimeException("KABOOM from 
ProcessingExceptionHandlerMock!");
         }
 
@@ -3044,7 +3044,7 @@ public class StreamTaskTest {
 
     public static class NullProcessingExceptionHandler implements 
ProcessingExceptionHandler {
         @Override
-        public ProcessingHandlerResponse handle(final ErrorHandlerContext 
context, final Record<?, ?> record, final Exception exception) {
+        public Response handleError(final ErrorHandlerContext context, final 
Record<?, ?> record, final Exception exception) {
             return null;
         }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java 
b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
index 8a7f5434963..ed6898b3713 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
@@ -81,6 +81,23 @@ public class MockRecordCollector implements RecordCollector {
         );
     }
 
+    @Override
+    public <K, V> void send(final K key,
+                            final V value,
+                            final String processorNodeId,
+                            final InternalProcessorContext<?, ?> context,
+                            final ProducerRecord<byte[], byte[]> 
serializedRecord) {
+        // Building a new ProducerRecord for key & value type conversion
+        collected.add(new ProducerRecord<>(
+                serializedRecord.topic(),
+                serializedRecord.partition(),
+                serializedRecord.timestamp(),
+                key,
+                value,
+                serializedRecord.headers())
+        );
+    }
+
     @Override
     public void initialize() {}
 

Reply via email to