This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new cdc2d957edb KAFKA-16505: Adding dead letter queue in Kafka Streams (#17942) cdc2d957edb is described below commit cdc2d957edb2551b4ace30acfc72f6853f00790b Author: Gasparina Damien <d.gaspar...@gmail.com> AuthorDate: Mon Jul 21 15:54:40 2025 +0200 KAFKA-16505: Adding dead letter queue in Kafka Streams (#17942) Implements KIP-1034 to add support of Dead Letter Queue in Kafka Streams. Reviewers: Lucas Brutschy <lbruts...@confluent.io>, Bruno Cadonna <cado...@apache.org> Co-authored-by: Sebastien Viale <sebastien.vi...@michelin.com> --- .../ProcessingExceptionHandlerIntegrationTest.java | 10 +- .../SwallowUnknownTopicErrorIntegrationTest.java | 10 +- .../org/apache/kafka/streams/StreamsConfig.java | 10 + .../errors/DefaultProductionExceptionHandler.java | 36 +-- .../errors/DeserializationExceptionHandler.java | 156 +++++++++++++ .../errors/LogAndContinueExceptionHandler.java | 39 +--- .../LogAndContinueProcessingExceptionHandler.java | 14 +- .../streams/errors/LogAndFailExceptionHandler.java | 39 +--- .../LogAndFailProcessingExceptionHandler.java | 13 +- .../streams/errors/ProcessingExceptionHandler.java | 162 +++++++++++++- .../streams/errors/ProductionExceptionHandler.java | 217 +++++++++++++++++- .../errors/internals/ExceptionHandlerUtils.java | 101 +++++++++ .../streams/processor/internals/ProcessorNode.java | 23 +- .../processor/internals/RecordCollector.java | 7 + .../processor/internals/RecordCollectorImpl.java | 74 ++++-- .../processor/internals/RecordDeserializer.java | 27 ++- .../streams/processor/internals/StreamTask.java | 22 +- .../apache/kafka/streams/StreamsConfigTest.java | 5 + .../streams/errors/ExceptionHandlerUtilsTest.java | 114 ++++++++++ .../processor/internals/ProcessorNodeTest.java | 132 ++++++++++- .../processor/internals/RecordCollectorTest.java | 247 ++++++++++++++++++--- .../internals/RecordDeserializerTest.java | 168 +++++++++++++- .../processor/internals/StreamTaskTest.java | 4 +- .../org/apache/kafka/test/MockRecordCollector.java | 17 ++ 24 files changed, 1477 insertions(+), 170 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java index 38711093ff8..4406f292205 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -357,7 +357,7 @@ public class ProcessingExceptionHandlerIntegrationTest { final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH)); assertEquals("Fatal user code error in processing error callback", e.getMessage()); assertInstanceOf(NullPointerException.class, e.getCause()); - assertEquals("Invalid ProductionExceptionHandler response.", e.getCause().getMessage()); + assertEquals("Invalid ProcessingExceptionHandler response.", e.getCause().getMessage()); assertFalse(isExecuted.get()); } } @@ -524,7 +524,7 @@ public class ProcessingExceptionHandlerIntegrationTest { public static class ContinueProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { @Override - public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { + public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { if (((String) record.key()).contains("FATAL")) { throw new RuntimeException("KABOOM!"); } @@ -532,7 +532,7 @@ public class ProcessingExceptionHandlerIntegrationTest { return null; } assertProcessingExceptionHandlerInputs(context, record, exception); - return ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE; + return Response.resume(); } @Override @@ -543,9 +543,9 @@ public class ProcessingExceptionHandlerIntegrationTest { public static class FailProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { @Override - public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { + public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { assertProcessingExceptionHandlerInputs(context, record, exception); - return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL; + return Response.fail(); } @Override diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java index a82e832e21c..0535dd2465e 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java @@ -156,15 +156,15 @@ public class SwallowUnknownTopicErrorIntegrationTest { public void configure(final Map<String, ?> configs) { } @Override - public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, - final ProducerRecord<byte[], byte[]> record, - final Exception exception) { + public Response handleError(final ErrorHandlerContext context, + final ProducerRecord<byte[], byte[]> record, + final Exception exception) { if (exception instanceof TimeoutException && exception.getCause() != null && exception.getCause() instanceof UnknownTopicOrPartitionException) { - return ProductionExceptionHandlerResponse.CONTINUE; + return Response.resume(); } - return ProductionExceptionHandler.super.handle(context, record, exception); + return ProductionExceptionHandler.super.handleError(context, record, exception); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 5e1eaff1162..6364e7e9126 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -619,6 +619,11 @@ public class StreamsConfig extends AbstractConfig { "support \"classic\" or \"streams\". If \"streams\" is specified, then the streams rebalance protocol will be " + "used. Otherwise, the classic group protocol will be used."; + public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "errors.dead.letter.queue.topic.name"; + + private static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "If not null, the default exception handler will build and send a Dead Letter Queue record to the topic with the provided name if an error occurs.\n" + + "If a custom deserialization/production or processing exception handler is set, this parameter is ignored for this handler."; + /** {@code log.summary.interval.ms} */ public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms"; private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output interval in milliseconds for logging summary information.\n" + @@ -991,6 +996,11 @@ public class StreamsConfig extends AbstractConfig { LogAndFailExceptionHandler.class.getName(), Importance.MEDIUM, DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) + .define(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, + Type.STRING, + null, + Importance.MEDIUM, + ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC) .define(MAX_TASK_IDLE_MS_CONFIG, Type.LONG, 0L, diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java index 5994326770c..3e9eb2fba86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java @@ -18,38 +18,42 @@ package org.apache.kafka.streams.errors; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.streams.StreamsConfig; import java.util.Map; +import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords; + /** * {@code ProductionExceptionHandler} that always instructs streams to fail when an exception * happens while attempting to produce result records. */ public class DefaultProductionExceptionHandler implements ProductionExceptionHandler { - /** - * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead. - */ - @SuppressWarnings("deprecation") - @Deprecated + + private String deadLetterQueueTopic = null; + @Override - public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record, - final Exception exception) { + public Response handleError(final ErrorHandlerContext context, + final ProducerRecord<byte[], byte[]> record, + final Exception exception) { return exception instanceof RetriableException ? - ProductionExceptionHandlerResponse.RETRY : - ProductionExceptionHandlerResponse.FAIL; + Response.retry() : + Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception)); } + @SuppressWarnings("rawtypes") @Override - public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, - final ProducerRecord<byte[], byte[]> record, - final Exception exception) { - return exception instanceof RetriableException ? - ProductionExceptionHandlerResponse.RETRY : - ProductionExceptionHandlerResponse.FAIL; + public Response handleSerializationError(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception, + final SerializationExceptionOrigin origin) { + return Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception)); } + @Override public void configure(final Map<String, ?> configs) { - // ignore + if (configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null) + deadLetterQueueTopic = String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG)); } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java index 0b44e04d791..8c3667c20f4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java @@ -17,10 +17,14 @@ package org.apache.kafka.streams.errors; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Configurable; import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.processor.ProcessorContext; +import java.util.Collections; +import java.util.List; + /** * Interface that specifies how an exception from source node deserialization * (e.g., reading from Kafka) should be handled. @@ -63,16 +67,35 @@ public interface DeserializationExceptionHandler extends Configurable { * The actual exception. * * @return Whether to continue or stop processing. + * + * @deprecated Use {@link #handleError(ErrorHandlerContext, ConsumerRecord, Exception)} instead. */ + @Deprecated default DeserializationHandlerResponse handle(final ErrorHandlerContext context, final ConsumerRecord<byte[], byte[]> record, final Exception exception) { return handle(((DefaultErrorHandlerContext) context).processorContext().orElse(null), record, exception); } + /** + * Inspects a record and the exception received during deserialization. + * + * @param context + * Error handler context. + * @param record + * Record that failed deserialization. + * @param exception + * The actual exception. + * + * @return a {@link Response} object + */ + default Response handleError(final ErrorHandlerContext context, final ConsumerRecord<byte[], byte[]> record, final Exception exception) { + return new Response(Result.from(handle(context, record, exception)), Collections.emptyList()); + } /** * Enumeration that describes the response from the exception handler. */ + @Deprecated enum DeserializationHandlerResponse { /** Continue processing. */ CONTINUE(0, "CONTINUE"), @@ -95,4 +118,137 @@ public interface DeserializationExceptionHandler extends Configurable { } } + /** + * Enumeration that describes the response from the exception handler. + */ + enum Result { + /** Continue processing. */ + RESUME(0, "RESUME"), + /** Fail processing. */ + FAIL(1, "FAIL"); + + /** + * An english description for the used option. This is for debugging only and may change. + */ + public final String name; + + /** + * The permanent and immutable id for the used option. This can't change ever. + */ + public final int id; + + Result(final int id, final String name) { + this.id = id; + this.name = name; + } + + /** + * Converts the deprecated enum DeserializationHandlerResponse into the new Result enum. + * + * @param value the old DeserializationHandlerResponse enum value + * @return a {@link Result} enum value + * @throws IllegalArgumentException if the provided value does not map to a valid {@link Result} + */ + private static DeserializationExceptionHandler.Result from(final DeserializationHandlerResponse value) { + switch (value) { + case FAIL: + return Result.FAIL; + case CONTINUE: + return Result.RESUME; + default: + throw new IllegalArgumentException("No Result enum found for old value: " + value); + } + } + } + + /** + * Represents the result of handling a deserialization exception. + * <p> + * The {@code Response} class encapsulates a {@link Result}, + * indicating whether processing should continue or fail, along with an optional list of + * {@link ProducerRecord} instances to be sent to a dead letter queue. + * </p> + */ + class Response { + + private final Result result; + + private final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords; + + /** + * Constructs a new {@code DeserializationExceptionResponse} object. + * + * @param result the result indicating whether processing should continue or fail; + * must not be {@code null}. + * @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}. + */ + private Response(final Result result, + final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) { + this.result = result; + this.deadLetterQueueRecords = deadLetterQueueRecords; + } + + /** + * Creates a {@code Response} indicating that processing should fail. + * + * @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}. + * @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#FAIL} status. + */ + public static Response fail(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) { + return new Response(Result.FAIL, deadLetterQueueRecords); + } + + /** + * Creates a {@code Response} indicating that processing should fail. + * + * @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#FAIL} status. + */ + public static Response fail() { + return fail(Collections.emptyList()); + } + + /** + * Creates a {@code Response} indicating that processing should continue. + * + * @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}. + * @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#RESUME} status. + */ + public static Response resume(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) { + return new Response(Result.RESUME, deadLetterQueueRecords); + } + + /** + * Creates a {@code Response} indicating that processing should continue. + * + * @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#RESUME} status. + */ + public static Response resume() { + return resume(Collections.emptyList()); + } + + /** + * Retrieves the deserialization handler result. + * + * @return the {@link Result} indicating whether processing should continue or fail. + */ + public Result result() { + return result; + } + + /** + * Retrieves an unmodifiable list of records to be sent to the dead letter queue. + * <p> + * If the list is {@code null}, an empty list is returned. + * </p> + * + * @return an unmodifiable list of {@link ProducerRecord} instances + * for the dead letter queue, or an empty list if no records are available. + */ + public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() { + if (deadLetterQueueRecords == null) { + return Collections.emptyList(); + } + return Collections.unmodifiableList(deadLetterQueueRecords); + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java index 6de997be986..63972b0840d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java @@ -17,47 +17,27 @@ package org.apache.kafka.streams.errors; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.StreamsConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords; + /** * Deserialization handler that logs a deserialization exception and then * signals the processing pipeline to continue processing more records. */ public class LogAndContinueExceptionHandler implements DeserializationExceptionHandler { private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class); + private String deadLetterQueueTopic = null; - /** - * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} instead. - */ - @SuppressWarnings("deprecation") - @Deprecated @Override - public DeserializationHandlerResponse handle(final ProcessorContext context, - final ConsumerRecord<byte[], byte[]> record, - final Exception exception) { - - log.warn( - "Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}", - context.taskId(), - record.topic(), - record.partition(), - record.offset(), - exception - ); - - return DeserializationHandlerResponse.CONTINUE; - } - - @Override - public DeserializationHandlerResponse handle(final ErrorHandlerContext context, - final ConsumerRecord<byte[], byte[]> record, - final Exception exception) { - + public Response handleError(final ErrorHandlerContext context, + final ConsumerRecord<byte[], byte[]> record, + final Exception exception) { log.warn( "Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}", context.taskId(), @@ -67,11 +47,12 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionH exception ); - return DeserializationHandlerResponse.CONTINUE; + return Response.resume(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception)); } @Override public void configure(final Map<String, ?> configs) { - // ignore + if (configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null) + deadLetterQueueTopic = String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java index c832ab14200..17de09e5f0c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.errors; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.api.Record; import org.slf4j.Logger; @@ -23,15 +24,20 @@ import org.slf4j.LoggerFactory; import java.util.Map; +import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords; + /** * Processing exception handler that logs a processing exception and then * signals the processing pipeline to continue processing more records. */ public class LogAndContinueProcessingExceptionHandler implements ProcessingExceptionHandler { private static final Logger log = LoggerFactory.getLogger(LogAndContinueProcessingExceptionHandler.class); + private String deadLetterQueueTopic = null; @Override - public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { + public Response handleError(final ErrorHandlerContext context, + final Record<?, ?> record, + final Exception exception) { log.warn( "Exception caught during message processing, processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}", context.processorNodeId(), @@ -41,12 +47,12 @@ public class LogAndContinueProcessingExceptionHandler implements ProcessingExcep context.offset(), exception ); - - return ProcessingHandlerResponse.CONTINUE; + return Response.resume(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception)); } @Override public void configure(final Map<String, ?> configs) { - // ignore + if (configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null) + deadLetterQueueTopic = String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java index 20e6b9414de..6fc129b4d78 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java @@ -17,47 +17,27 @@ package org.apache.kafka.streams.errors; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.StreamsConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords; + /** * Deserialization handler that logs a deserialization exception and then * signals the processing pipeline to stop processing more records and fail. */ public class LogAndFailExceptionHandler implements DeserializationExceptionHandler { private static final Logger log = LoggerFactory.getLogger(LogAndFailExceptionHandler.class); + private String deadLetterQueueTopic = null; - /** - * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} instead. - */ - @SuppressWarnings("deprecation") - @Deprecated @Override - public DeserializationHandlerResponse handle(final ProcessorContext context, - final ConsumerRecord<byte[], byte[]> record, - final Exception exception) { - - log.error( - "Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}", - context.taskId(), - record.topic(), - record.partition(), - record.offset(), - exception - ); - - return DeserializationHandlerResponse.FAIL; - } - - @Override - public DeserializationHandlerResponse handle(final ErrorHandlerContext context, - final ConsumerRecord<byte[], byte[]> record, - final Exception exception) { - + public Response handleError(final ErrorHandlerContext context, + final ConsumerRecord<byte[], byte[]> record, + final Exception exception) { log.error( "Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}", context.taskId(), @@ -67,11 +47,12 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl exception ); - return DeserializationHandlerResponse.FAIL; + return Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception)); } @Override public void configure(final Map<String, ?> configs) { - // ignore + if (configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null) + deadLetterQueueTopic = String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG)); } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java index f592663a6c0..5372d9ad0b6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.errors; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.api.Record; import org.slf4j.Logger; @@ -23,15 +24,20 @@ import org.slf4j.LoggerFactory; import java.util.Map; +import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords; + /** * Processing exception handler that logs a processing exception and then * signals the processing pipeline to stop processing more records and fail. */ public class LogAndFailProcessingExceptionHandler implements ProcessingExceptionHandler { private static final Logger log = LoggerFactory.getLogger(LogAndFailProcessingExceptionHandler.class); + private String deadLetterQueueTopic = null; @Override - public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { + public Response handleError(final ErrorHandlerContext context, + final Record<?, ?> record, + final Exception exception) { log.error( "Exception caught during message processing, processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}", context.processorNodeId(), @@ -42,11 +48,12 @@ public class LogAndFailProcessingExceptionHandler implements ProcessingException exception ); - return ProcessingHandlerResponse.FAIL; + return Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception)); } @Override public void configure(final Map<String, ?> configs) { - // ignore + if (configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null) + deadLetterQueueTopic = String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java index 7dc1b90bc2e..f4c32764877 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java @@ -16,13 +16,18 @@ */ package org.apache.kafka.streams.errors; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Configurable; import org.apache.kafka.streams.processor.api.Record; +import java.util.Collections; +import java.util.List; + /** * An interface that allows user code to inspect a record that has failed processing */ public interface ProcessingExceptionHandler extends Configurable { + /** * Inspect a record and the exception received * @@ -34,9 +39,30 @@ public interface ProcessingExceptionHandler extends Configurable { * The actual exception. * * @return Whether to continue or stop processing. + * @deprecated Use {@link #handleError(ErrorHandlerContext, Record, Exception)} instead. + */ + @Deprecated + default ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { + throw new UnsupportedOperationException(); + }; + + /** + * Inspects a record and the exception received during processing. + * + * @param context + * Processing context metadata. + * @param record + * Record where the exception occurred. + * @param exception + * The actual exception. + * + * @return a {@link Response} object */ - ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception); + default Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { + return new Response(ProcessingExceptionHandler.Result.from(handle(context, record, exception)), Collections.emptyList()); + } + @Deprecated enum ProcessingHandlerResponse { /** Continue processing. */ CONTINUE(1, "CONTINUE"), @@ -58,4 +84,138 @@ public interface ProcessingExceptionHandler extends Configurable { this.name = name; } } + + /** + * Enumeration that describes the response from the exception handler. + */ + enum Result { + /** Resume processing. */ + RESUME(1, "RESUME"), + /** Fail processing. */ + FAIL(2, "FAIL"); + + /** + * An english description for the used option. This is for debugging only and may change. + */ + public final String name; + + /** + * The permanent and immutable id for the used option. This can't change ever. + */ + public final int id; + + Result(final int id, final String name) { + this.id = id; + this.name = name; + } + + /** + * Converts the deprecated enum ProcessingHandlerResponse into the new Result enum. + * + * @param value the old DeserializationHandlerResponse enum value + * @return a {@link ProcessingExceptionHandler.Result} enum value + * @throws IllegalArgumentException if the provided value does not map to a valid {@link ProcessingExceptionHandler.Result} + */ + private static ProcessingExceptionHandler.Result from(final ProcessingHandlerResponse value) { + switch (value) { + case FAIL: + return Result.FAIL; + case CONTINUE: + return Result.RESUME; + default: + throw new IllegalArgumentException("No Result enum found for old value: " + value); + } + } + } + + /** + * Represents the result of handling a processing exception. + * <p> + * The {@code Response} class encapsulates a {@link Result}, + * indicating whether processing should continue or fail, along with an optional list of + * {@link org.apache.kafka.clients.producer.ProducerRecord} instances to be sent to a dead letter queue. + * </p> + */ + class Response { + + private final Result result; + + private final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords; + + /** + * Constructs a new {@code ProcessingExceptionResponse} object. + * + * @param result the result indicating whether processing should continue or fail; + * must not be {@code null}. + * @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}. + */ + private Response(final Result result, + final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) { + this.result = result; + this.deadLetterQueueRecords = deadLetterQueueRecords; + } + + /** + * Creates a {@code Response} indicating that processing should fail. + * + * @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}. + * @return a {@code Response} with a {@link ProcessingExceptionHandler.Result#FAIL} status. + */ + public static Response fail(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) { + return new Response(Result.FAIL, deadLetterQueueRecords); + } + + /** + * Creates a {@code Response} indicating that processing should fail. + * + * @return a {@code Response} with a {@link ProcessingExceptionHandler.Result#FAIL} status. + */ + public static Response fail() { + return fail(Collections.emptyList()); + } + + /** + * Creates a {@code Response} indicating that processing should continue. + * + * @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}. + * @return a {@code Response} with a {@link ProcessingExceptionHandler.Result#RESUME} status. + */ + public static Response resume(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) { + return new Response(Result.RESUME, deadLetterQueueRecords); + } + + /** + * Creates a {@code Response} indicating that processing should continue. + * + * @return a {@code Response} with a {@link ProcessingExceptionHandler.Result#RESUME} status. + */ + public static Response resume() { + return resume(Collections.emptyList()); + } + + /** + * Retrieves the processing handler result. + * + * @return the {@link Result} indicating whether processing should continue or fail. + */ + public Result result() { + return result; + } + + /** + * Retrieves an unmodifiable list of records to be sent to the dead letter queue. + * <p> + * If the list is {@code null}, an empty list is returned. + * </p> + * + * @return an unmodifiable list of {@link ProducerRecord} instances + * for the dead letter queue, or an empty list if no records are available. + */ + public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() { + if (deadLetterQueueRecords == null) { + return Collections.emptyList(); + } + return Collections.unmodifiableList(deadLetterQueueRecords); + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java index ed6b38a5692..717866a9bed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java @@ -19,6 +19,9 @@ package org.apache.kafka.streams.errors; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Configurable; +import java.util.Collections; +import java.util.List; + /** * Interface that specifies how an exception when attempting to produce a result to * Kafka should be handled. @@ -55,13 +58,34 @@ public interface ProductionExceptionHandler extends Configurable { * The exception that occurred during production. * * @return Whether to continue or stop processing, or retry the failed operation. + * @deprecated Use {@link #handleError(ErrorHandlerContext, ProducerRecord, Exception)} instead. */ + @Deprecated default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord<byte[], byte[]> record, final Exception exception) { return handle(record, exception); } + /** + * Inspect a record that we attempted to produce, and the exception that resulted + * from attempting to produce it and determine to continue or stop processing. + * + * @param context + * The error handler context metadata. + * @param record + * The record that failed to produce. + * @param exception + * The exception that occurred during production. + * + * @return a {@link Response} object + */ + default Response handleError(final ErrorHandlerContext context, + final ProducerRecord<byte[], byte[]> record, + final Exception exception) { + return new Response(Result.from(handle(context, record, exception)), Collections.emptyList()); + } + /** * Handles serialization exception and determine if the process should continue. The default implementation is to * fail the process. @@ -79,7 +103,7 @@ public interface ProductionExceptionHandler extends Configurable { @Deprecated default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record, final Exception exception) { - return ProductionExceptionHandlerResponse.FAIL; + return ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL; } /** @@ -96,8 +120,11 @@ public interface ProductionExceptionHandler extends Configurable { * The origin of the serialization exception. * * @return Whether to continue or stop processing, or retry the failed operation. + * + * @deprecated Use {@link #handleSerializationError(ErrorHandlerContext, ProducerRecord, Exception, SerializationExceptionOrigin)} instead. */ @SuppressWarnings("rawtypes") + @Deprecated default ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context, final ProducerRecord record, final Exception exception, @@ -105,6 +132,30 @@ public interface ProductionExceptionHandler extends Configurable { return handleSerializationException(record, exception); } + /** + * Handles serialization exception and determine if the process should continue. The default implementation is to + * fail the process. + * + * @param context + * The error handler context metadata. + * @param record + * The record that failed to serialize. + * @param exception + * The exception that occurred during serialization. + * @param origin + * The origin of the serialization exception. + * + * @return a {@link Response} object + */ + @SuppressWarnings("rawtypes") + default Response handleSerializationError(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception, + final SerializationExceptionOrigin origin) { + return new Response(Result.from(handleSerializationException(context, record, exception, origin)), Collections.emptyList()); + } + + @Deprecated enum ProductionExceptionHandlerResponse { /** Continue processing. * @@ -147,10 +198,174 @@ public interface ProductionExceptionHandler extends Configurable { } } + /** + * Enumeration that describes the response from the exception handler. + */ + enum Result { + /** Resume processing. + * + * <p> For this case, output records which could not be written successfully are lost. + * Use this option only if you can tolerate data loss. + */ + RESUME(0, "RESUME"), + /** Fail processing. + * + * <p> Kafka Streams will raise an exception and the {@code StreamsThread} will fail. + * No offsets (for {@link org.apache.kafka.streams.StreamsConfig#AT_LEAST_ONCE at-least-once}) or transactions + * (for {@link org.apache.kafka.streams.StreamsConfig#EXACTLY_ONCE_V2 exactly-once}) will be committed. + */ + FAIL(1, "FAIL"), + /** Retry the failed operation. + * + * <p> Retrying might imply that a {@link TaskCorruptedException} exception is thrown, and that the retry + * is started from the last committed offset. + * + * <p> <b>NOTE:</b> {@code RETRY} is only a valid return value for + * {@link org.apache.kafka.common.errors.RetriableException retriable exceptions}. + * If {@code RETRY} is returned for a non-retriable exception it will be interpreted as {@link #FAIL}. + */ + RETRY(2, "RETRY"); + + /** + * An english description for the used option. This is for debugging only and may change. + */ + public final String name; + + /** + * The permanent and immutable id for the used option. This can't change ever. + */ + public final int id; + + Result(final int id, final String name) { + this.id = id; + this.name = name; + } + + /** + * Converts the deprecated enum ProductionExceptionHandlerResponse into the new Result enum. + * + * @param value the old ProductionExceptionHandlerResponse enum value + * @return a {@link ProductionExceptionHandler.Result} enum value + * @throws IllegalArgumentException if the provided value does not map to a valid {@link ProductionExceptionHandler.Result} + */ + private static ProductionExceptionHandler.Result from(final ProductionExceptionHandlerResponse value) { + switch (value) { + case FAIL: + return Result.FAIL; + case CONTINUE: + return Result.RESUME; + case RETRY: + return Result.RETRY; + default: + throw new IllegalArgumentException("No Result enum found for old value: " + value); + } + } + } + enum SerializationExceptionOrigin { /** Serialization exception occurred during serialization of the key. */ KEY, /** Serialization exception occurred during serialization of the value. */ VALUE } + + /** + * Represents the result of handling a production exception. + * <p> + * The {@code Response} class encapsulates a {@link Result}, + * indicating whether processing should continue or fail, along with an optional list of + * {@link ProducerRecord} instances to be sent to a dead letter queue. + * </p> + */ + class Response { + + private final Result result; + + private final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords; + + /** + * Constructs a new {@code Response} object. + * + * @param result the result indicating whether processing should continue or fail; + * must not be {@code null}. + * @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}. + */ + private Response(final Result result, + final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) { + this.result = result; + this.deadLetterQueueRecords = deadLetterQueueRecords; + } + + /** + * Creates a {@code Response} indicating that processing should fail. + * + * @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}. + * @return a {@code Response} with a {@link ProductionExceptionHandler.Result#FAIL} status. + */ + public static Response fail(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) { + return new Response(Result.FAIL, deadLetterQueueRecords); + } + + /** + * Creates a {@code Response} indicating that processing should fail. + * + * @return a {@code Response} with a {@link ProductionExceptionHandler.Result#FAIL} status. + */ + public static Response fail() { + return fail(Collections.emptyList()); + } + + /** + * Creates a {@code Response} indicating that processing should continue. + * + * @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}. + * @return a {@code Response} with a {@link ProductionExceptionHandler.Result#RESUME} status. + */ + public static Response resume(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) { + return new Response(Result.RESUME, deadLetterQueueRecords); + } + + /** + * Creates a {@code Response} indicating that processing should continue. + * + * @return a {@code Response} with a {@link ProductionExceptionHandler.Result#RESUME} status. + */ + public static Response resume() { + return resume(Collections.emptyList()); + } + + /** + * Creates a {@code Response} indicating that processing should retry. + * + * @return a {@code Response} with a {@link ProductionExceptionHandler.Result#RETRY} status. + */ + public static Response retry() { + return new Response(Result.RETRY, Collections.emptyList()); + } + + /** + * Retrieves the production exception handler result. + * + * @return the {@link Result} indicating whether processing should continue, fail or retry. + */ + public Result result() { + return result; + } + + /** + * Retrieves an unmodifiable list of records to be sent to the dead letter queue. + * <p> + * If the list is {@code null}, an empty list is returned. + * </p> + * + * @return an unmodifiable list of {@link ProducerRecord} instances + * for the dead letter queue, or an empty list if no records are available. + */ + public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() { + if (deadLetterQueueRecords == null) { + return Collections.emptyList(); + } + return Collections.unmodifiableList(deadLetterQueueRecords); + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java new file mode 100644 index 00000000000..d3fd221cea8 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.errors.internals; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.ErrorHandlerContext; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collections; +import java.util.List; + +/** + * {@code ExceptionHandlerUtils} Contains utilities method that could be used by all exception handlers + */ +public class ExceptionHandlerUtils { + public static final String HEADER_ERRORS_EXCEPTION_NAME = "__streams.errors.exception"; + public static final String HEADER_ERRORS_STACKTRACE_NAME = "__streams.errors.stacktrace"; + public static final String HEADER_ERRORS_EXCEPTION_MESSAGE_NAME = "__streams.errors.message"; + public static final String HEADER_ERRORS_TOPIC_NAME = "__streams.errors.topic"; + public static final String HEADER_ERRORS_PARTITION_NAME = "__streams.errors.partition"; + public static final String HEADER_ERRORS_OFFSET_NAME = "__streams.errors.offset"; + + + public static boolean shouldBuildDeadLetterQueueRecord(final String deadLetterQueueTopicName) { + return deadLetterQueueTopicName != null; + } + + /** + * If required, return Dead Letter Queue records for the provided exception + * + * @param key Serialized key for the records + * @param value Serialized value for the records + * @param context ErrorHandlerContext of the exception + * @param exception Thrown exception + * @return A list of Dead Letter Queue records to produce + */ + public static List<ProducerRecord<byte[], byte[]>> maybeBuildDeadLetterQueueRecords(final String deadLetterQueueTopicName, + final byte[] key, + final byte[] value, + final ErrorHandlerContext context, + final Exception exception) { + if (!shouldBuildDeadLetterQueueRecord(deadLetterQueueTopicName)) { + return Collections.emptyList(); + } + + return Collections.singletonList(buildDeadLetterQueueRecord(deadLetterQueueTopicName, key, value, context, exception)); + } + + + /** + * Build dead letter queue record for the provided exception. + * + * @param key Serialized key for the record. + * @param value Serialized value for the record. + * @param context error handler context of the exception. + * @return A dead letter queue record to produce. + */ + public static ProducerRecord<byte[], byte[]> buildDeadLetterQueueRecord(final String deadLetterQueueTopicName, + final byte[] key, + final byte[] value, + final ErrorHandlerContext context, + final Exception e) { + if (deadLetterQueueTopicName == null) { + throw new InvalidConfigurationException(String.format("%s cannot be null while building dead letter queue record", StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG)); + } + final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(deadLetterQueueTopicName, null, context.timestamp(), key, value); + final StringWriter stackTraceStringWriter = new StringWriter(); + final PrintWriter stackTracePrintWriter = new PrintWriter(stackTraceStringWriter); + e.printStackTrace(stackTracePrintWriter); + + try (final StringSerializer stringSerializer = new StringSerializer()) { + producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME, stringSerializer.serialize(null, e.toString())); + producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME, stringSerializer.serialize(null, e.getMessage())); + producerRecord.headers().add(HEADER_ERRORS_STACKTRACE_NAME, stringSerializer.serialize(null, stackTraceStringWriter.toString())); + producerRecord.headers().add(HEADER_ERRORS_TOPIC_NAME, stringSerializer.serialize(null, context.topic())); + producerRecord.headers().add(HEADER_ERRORS_PARTITION_NAME, stringSerializer.serialize(null, String.valueOf(context.partition()))); + producerRecord.headers().add(HEADER_ERRORS_OFFSET_NAME, stringSerializer.serialize(null, String.valueOf(context.offset()))); + } + + return producerRecord; + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 1dddc55ca3c..bbf82ff9033 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.ProcessingExceptionHandler; @@ -220,11 +221,11 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> { internalProcessorContext.recordContext().sourceRawValue() ); - final ProcessingExceptionHandler.ProcessingHandlerResponse response; + final ProcessingExceptionHandler.Response response; try { response = Objects.requireNonNull( - processingExceptionHandler.handle(errorHandlerContext, record, processingException), - "Invalid ProductionExceptionHandler response." + processingExceptionHandler.handleError(errorHandlerContext, record, processingException), + "Invalid ProcessingExceptionHandler response." ); } catch (final Exception fatalUserException) { // while Java distinguishes checked vs unchecked exceptions, other languages @@ -242,7 +243,21 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> { ); } - if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { + final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords = response.deadLetterQueueRecords(); + if (!deadLetterQueueRecords.isEmpty()) { + final RecordCollector collector = ((RecordCollector.Supplier) internalProcessorContext).recordCollector(); + for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord : deadLetterQueueRecords) { + collector.send( + deadLetterQueueRecord.key(), + deadLetterQueueRecord.value(), + name(), + internalProcessorContext, + deadLetterQueueRecord + ); + } + } + + if (response.result() == ProcessingExceptionHandler.Result.FAIL) { log.error("Processing exception handler is set to fail upon" + " a processing error. If you would rather have the streaming pipeline" + " continue after a processing error, please set the " + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java index a48a671d460..2c14200f413 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; @@ -48,6 +49,12 @@ public interface RecordCollector { final InternalProcessorContext<Void, Void> context, final StreamPartitioner<? super K, ? super V> partitioner); + <K, V> void send(K key, + V value, + String processorNodeId, + InternalProcessorContext<?, ?> context, + ProducerRecord<byte[], byte[]> serializedRecord); + /** * Initialize the internal {@link Producer}; note this function should be made idempotent * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 89cbf4d4c7d..c4178733ef3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -41,7 +41,6 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.ProductionExceptionHandler; -import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; @@ -263,6 +262,15 @@ public class RecordCollectorImpl implements RecordCollector { // freeing raw records in the context to reduce memory pressure freeRawInputRecordFromContext(context); + send(key, value, processorNodeId, context, serializedRecord); + } + + public <K, V> void send(final K key, + final V value, + final String processorNodeId, + final InternalProcessorContext<?, ?> context, + final ProducerRecord<byte[], byte[]> serializedRecord) { + streamsProducer.send(serializedRecord, (metadata, exception) -> { try { // if there's already an exception record, skip logging offsets or new exceptions @@ -278,16 +286,16 @@ public class RecordCollectorImpl implements RecordCollector { log.warn("Received offset={} in produce response for {}", metadata.offset(), tp); } - if (!topic.endsWith("-changelog")) { + if (!serializedRecord.topic().endsWith("-changelog")) { // we may not have created a sensor during initialization if the node uses dynamic topic routing, // as all topics are not known up front, so create the sensor for this topic if absent final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent( - topic, + serializedRecord.topic(), t -> TopicMetrics.producedSensor( Thread.currentThread().getName(), taskId.toString(), processorNodeId, - topic, + serializedRecord.topic(), context.metrics() ) ); @@ -299,7 +307,7 @@ public class RecordCollectorImpl implements RecordCollector { } } else { recordSendError( - topic, + serializedRecord.topic(), exception, serializedRecord, context, @@ -307,7 +315,7 @@ public class RecordCollectorImpl implements RecordCollector { ); // KAFKA-7510 only put message key and value in TRACE level log so we don't leak data by default - log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition); + log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, serializedRecord.timestamp(), serializedRecord.topic(), serializedRecord.partition()); } } catch (final RuntimeException fatal) { sendException.set(new StreamsException("Producer.send `Callback` failed", fatal)); @@ -329,16 +337,16 @@ public class RecordCollectorImpl implements RecordCollector { final Integer partition, final Long timestamp, final String processorNodeId, - final InternalProcessorContext<Void, Void> context, + final InternalProcessorContext<?, ?> context, final Exception serializationException) { log.debug(String.format("Error serializing record for topic %s", topic), serializationException); final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers); - final ProductionExceptionHandlerResponse response; + final ProductionExceptionHandler.Response response; try { response = Objects.requireNonNull( - productionExceptionHandler.handleSerializationException( + productionExceptionHandler.handleSerializationError( errorHandlerContext(context, processorNodeId), record, serializationException, @@ -365,7 +373,20 @@ public class RecordCollectorImpl implements RecordCollector { ); } - if (maybeFailResponse(response) == ProductionExceptionHandlerResponse.FAIL) { + final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords = response.deadLetterQueueRecords(); + if (!deadLetterQueueRecords.isEmpty()) { + for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord : deadLetterQueueRecords) { + this.send( + deadLetterQueueRecord.key(), + deadLetterQueueRecord.value(), + processorNodeId, + context, + deadLetterQueueRecord + ); + } + } + + if (maybeFailResponse(response.result()) == ProductionExceptionHandler.Result.FAIL) { throw new StreamsException( String.format( "Unable to serialize record. ProducerRecord(topic=[%s], partition=[%d], timestamp=[%d]", @@ -385,7 +406,7 @@ public class RecordCollectorImpl implements RecordCollector { droppedRecordsSensor.record(); } - private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorContext<Void, Void> context, + private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorContext<?, ?> context, final String processorNodeId) { final RecordContext recordContext = context != null ? context.recordContext() : null; @@ -442,7 +463,7 @@ public class RecordCollectorImpl implements RecordCollector { private void recordSendError(final String topic, final Exception productionException, final ProducerRecord<byte[], byte[]> serializedRecord, - final InternalProcessorContext<Void, Void> context, + final InternalProcessorContext<?, ?> context, final String processorNodeId) { String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, productionException.toString()); @@ -462,10 +483,10 @@ public class RecordCollectorImpl implements RecordCollector { // TransactionAbortedException is only thrown after `abortTransaction()` was called, // so it's only a followup error, and Kafka Streams is already handling the original error } else { - final ProductionExceptionHandlerResponse response; + final ProductionExceptionHandler.Response response; try { response = Objects.requireNonNull( - productionExceptionHandler.handle( + productionExceptionHandler.handleError( errorHandlerContext(context, processorNodeId), serializedRecord, productionException @@ -490,14 +511,27 @@ public class RecordCollectorImpl implements RecordCollector { return; } - if (productionException instanceof RetriableException && response == ProductionExceptionHandlerResponse.RETRY) { + final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords = response.deadLetterQueueRecords(); + if (!deadLetterQueueRecords.isEmpty()) { + for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord : deadLetterQueueRecords) { + this.send( + deadLetterQueueRecord.key(), + deadLetterQueueRecord.value(), + processorNodeId, + context, + deadLetterQueueRecord + ); + } + } + + if (productionException instanceof RetriableException && response.result() == ProductionExceptionHandler.Result.RETRY) { errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " + "or the connection to broker was interrupted sending the request or receiving the response. " + "\nConsider overwriting `max.block.ms` and /or " + "`delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors"; sendException.set(new TaskCorruptedException(Collections.singleton(taskId))); } else { - if (maybeFailResponse(response) == ProductionExceptionHandlerResponse.FAIL) { + if (maybeFailResponse(response.result()) == ProductionExceptionHandler.Result.FAIL) { errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent."; sendException.set(new StreamsException(errorMessage, productionException)); } else { @@ -510,12 +544,12 @@ public class RecordCollectorImpl implements RecordCollector { log.error(errorMessage, productionException); } - private ProductionExceptionHandlerResponse maybeFailResponse(final ProductionExceptionHandlerResponse response) { - if (response == ProductionExceptionHandlerResponse.RETRY) { + private ProductionExceptionHandler.Result maybeFailResponse(final ProductionExceptionHandler.Result result) { + if (result == ProductionExceptionHandler.Result.RETRY) { log.warn("ProductionExceptionHandler returned RETRY for a non-retriable exception. Will treat it as FAIL."); - return ProductionExceptionHandlerResponse.FAIL; + return ProductionExceptionHandler.Result.FAIL; } else { - return response; + return result; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java index 153ca2e02f1..88e756d785a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java @@ -17,17 +17,18 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; -import org.apache.kafka.streams.errors.DeserializationExceptionHandler.DeserializationHandlerResponse; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.slf4j.Logger; +import java.util.List; import java.util.Objects; import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; @@ -50,7 +51,7 @@ public class RecordDeserializer { /** * @throws StreamsException if a deserialization error occurs and the deserialization callback returns - * {@link DeserializationHandlerResponse#FAIL FAIL} + * {@link DeserializationExceptionHandler.Result#FAIL FAIL} * or throws an exception itself */ ConsumerRecord<Object, Object> deserialize(final ProcessorContext<?, ?> processorContext, @@ -100,11 +101,11 @@ public class RecordDeserializer { rawRecord.value() ); - final DeserializationHandlerResponse response; + final DeserializationExceptionHandler.Response response; try { response = Objects.requireNonNull( - deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, deserializationException), - "Invalid DeserializationExceptionHandler response." + deserializationExceptionHandler.handleError(errorHandlerContext, rawRecord, deserializationException), + "Invalid DeserializationExceptionResponse response." ); } catch (final Exception fatalUserException) { // while Java distinguishes checked vs unchecked exceptions, other languages @@ -118,7 +119,21 @@ public class RecordDeserializer { throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException); } - if (response == DeserializationHandlerResponse.FAIL) { + final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords = response.deadLetterQueueRecords(); + if (!deadLetterQueueRecords.isEmpty()) { + final RecordCollector collector = ((RecordCollector.Supplier) processorContext).recordCollector(); + for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord : deadLetterQueueRecords) { + collector.send( + deadLetterQueueRecord.key(), + deadLetterQueueRecord.value(), + sourceNodeName, + (InternalProcessorContext) processorContext, + deadLetterQueueRecord + ); + } + } + + if (response.result() == DeserializationExceptionHandler.Result.FAIL) { throw new StreamsException("Deserialization exception handler is set to fail upon" + " a deserialization error. If you would rather have the streaming pipeline" + " continue after a deserialization error, please set the " + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 82e9c8d7fb1..42b57e46aa4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; @@ -945,10 +946,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, recordContext.sourceRawValue() ); - final ProcessingExceptionHandler.ProcessingHandlerResponse response; + final ProcessingExceptionHandler.Response processingExceptionResponse; try { - response = Objects.requireNonNull( - processingExceptionHandler.handle(errorHandlerContext, null, processingException), + processingExceptionResponse = Objects.requireNonNull( + processingExceptionHandler.handleError(errorHandlerContext, null, processingException), "Invalid ProcessingExceptionHandler response." ); } catch (final Exception fatalUserException) { @@ -963,7 +964,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, throw new FailedProcessingException("Fatal user code error in processing error callback", node.name(), fatalUserException); } - if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { + final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords = processingExceptionResponse.deadLetterQueueRecords(); + if (!deadLetterQueueRecords.isEmpty()) { + final RecordCollector collector = ((RecordCollector.Supplier) processorContext).recordCollector(); + for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord : deadLetterQueueRecords) { + collector.send( + deadLetterQueueRecord.key(), + deadLetterQueueRecord.value(), + node.name(), + processorContext, + deadLetterQueueRecord); + } + } + + if (processingExceptionResponse.result() == ProcessingExceptionHandler.Result.FAIL) { log.error("Processing exception handler is set to fail upon" + " a processing error. If you would rather have the streaming pipeline" + " continue after a processing error, please set the " + diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 6505cde08ed..bd8002782d2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -1683,6 +1683,11 @@ public class StreamsConfigTest { "Please set group.protocol=classic or remove group.instance.id from the configuration.")); } + public void shouldSetDefaultDeadLetterQueue() { + final StreamsConfig config = new StreamsConfig(props); + assertNull(config.getString(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG)); + } + static class MisconfiguredSerde implements Serde<Object> { @Override public void configure(final Map<String, ?> configs, final boolean isKey) { diff --git a/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java new file mode 100644 index 00000000000..915f3a3f650 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; +import org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockRecordCollector; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.Iterator; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@ExtendWith(MockitoExtension.class) +public class ExceptionHandlerUtilsTest { + @Test + public void checkDeadLetterQueueRecords() { + final StringSerializer stringSerializer = new StringSerializer(); + final StringDeserializer stringDeserializer = new StringDeserializer(); + final MockRecordCollector collector = new MockRecordCollector(); + final String key = "key"; + final String value = "value"; + final InternalProcessorContext<Object, Object> internalProcessorContext = new InternalMockProcessorContext<>( + new StateSerdes<>("sink", Serdes.ByteArray(), Serdes.ByteArray()), + collector + ); + internalProcessorContext.setRecordContext(new ProcessorRecordContext( + 1L, + 2, + 3, + "source", + new RecordHeaders(Collections.singletonList( + new RecordHeader("sourceHeader", stringSerializer.serialize(null, "hello world")))), + key.getBytes(), + value.getBytes() + )); + final ErrorHandlerContext errorHandlerContext = getErrorHandlerContext(internalProcessorContext); + + final NullPointerException exception = new NullPointerException("Oopsie!"); + final Iterable<ProducerRecord<byte[], byte[]>> dlqRecords = ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords("dlq", errorHandlerContext.sourceRawKey(), errorHandlerContext.sourceRawValue(), errorHandlerContext, exception); + final Iterator<ProducerRecord<byte[], byte[]>> iterator = dlqRecords.iterator(); + + assertTrue(iterator.hasNext()); + final ProducerRecord<byte[], byte[]> dlqRecord = iterator.next(); + final Headers headers = dlqRecord.headers(); + assertFalse(iterator.hasNext()); // There should be only one record + + assertEquals("dlq", dlqRecord.topic()); + assertEquals(errorHandlerContext.timestamp(), dlqRecord.timestamp()); + assertEquals(1, dlqRecord.timestamp()); + assertEquals(key, new String(dlqRecord.key())); + assertEquals(value, new String(dlqRecord.value())); + assertEquals(exception.toString(), stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_NAME).value())); + assertEquals(exception.getMessage(), stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value())); + assertEquals("source", stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_TOPIC_NAME).value())); + assertEquals("3", stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_PARTITION_NAME).value())); + assertEquals("2", stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_OFFSET_NAME).value())); + } + + @Test + public void doNotBuildDeadLetterQueueRecordsIfNotConfigured() { + final NullPointerException exception = new NullPointerException("Oopsie!"); + final Iterable<ProducerRecord<byte[], byte[]>> dlqRecords = ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords(null, null, null, null, exception); + final Iterator<ProducerRecord<byte[], byte[]>> iterator = dlqRecords.iterator(); + + assertFalse(iterator.hasNext()); + } + + private static DefaultErrorHandlerContext getErrorHandlerContext(final InternalProcessorContext<Object, Object> internalProcessorContext) { + return new DefaultErrorHandlerContext( + null, + internalProcessorContext.topic(), + internalProcessorContext.partition(), + internalProcessorContext.offset(), + internalProcessorContext.headers(), + internalProcessorContext.currentNode().name(), + internalProcessorContext.taskId(), + internalProcessorContext.timestamp(), + internalProcessorContext.recordContext().sourceRawKey(), + internalProcessorContext.recordContext().sourceRawValue()); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index e9669ac39f4..6b7c06580c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.InvalidOffsetException; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; @@ -29,6 +30,8 @@ import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.errors.ErrorHandlerContext; +import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler; import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; @@ -39,7 +42,9 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockRecordCollector; import org.apache.kafka.test.StreamsTestUtils; import org.junit.jupiter.api.Test; @@ -52,10 +57,13 @@ import org.mockito.quality.Strictness; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import static org.apache.kafka.streams.errors.ProcessingExceptionHandler.Response; +import static org.apache.kafka.streams.errors.ProcessingExceptionHandler.Result; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -103,7 +111,7 @@ public class ProcessorNodeTest { new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet()); final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext(); - node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL, internalProcessorContext, false)); + node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.Response.fail(), internalProcessorContext, false)); final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class, () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); @@ -120,7 +128,7 @@ public class ProcessorNodeTest { new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet()); final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext(); - node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext, false)); + node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.Response.resume(), internalProcessorContext, false)); assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); } @@ -147,7 +155,7 @@ public class ProcessorNodeTest { assertEquals(ignoredExceptionCause, runtimeException.getCause().getClass()); assertEquals(ignoredExceptionCauseMessage, runtimeException.getCause().getMessage()); - verify(processingExceptionHandler, never()).handle(any(), any(), any()); + verify(processingExceptionHandler, never()).handleError(any(), any(), any()); } @Test @@ -156,7 +164,7 @@ public class ProcessorNodeTest { new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet()); final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext(); - node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext, true)); + node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.Response.resume(), internalProcessorContext, true)); final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class, () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); @@ -166,6 +174,58 @@ public class ProcessorNodeTest { assertEquals(NAME, failedProcessingException.failedProcessorNodeName()); } + + @Test + public void shouldBuildDeadLetterQueueRecordsInDefaultProcessingExceptionHandler() { + final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("processor", + (Processor<Object, Object, Object, Object>) record -> { + throw new NullPointerException("Oopsie!"); + }, Collections.emptySet()); + + final MockRecordCollector collector = new MockRecordCollector(); + final InternalProcessorContext<Object, Object> internalProcessorContext = + new InternalMockProcessorContext<>( + new StateSerdes<>("sink", Serdes.ByteArray(), Serdes.ByteArray()), + collector + ); + final ProcessingExceptionHandler processingExceptionHandler = new LogAndFailProcessingExceptionHandler(); + processingExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "dlq")); + node.init(internalProcessorContext, processingExceptionHandler); + + assertThrows(RuntimeException.class, + () -> node.process(new Record<>("hello", "world", 1L))); + + assertEquals(1, collector.collected().size()); + assertEquals("dlq", collector.collected().get(0).topic()); + assertEquals("sourceKey", new String((byte[]) collector.collected().get(0).key())); + assertEquals("sourceValue", new String((byte[]) collector.collected().get(0).value())); + } + + @Test + public void shouldBuildDeadLetterQueueRecordsInLogAndContinueProcessingExceptionHandler() { + final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("processor", + (Processor<Object, Object, Object, Object>) record -> { + throw new NullPointerException("Oopsie!"); + }, Collections.emptySet()); + + final MockRecordCollector collector = new MockRecordCollector(); + final InternalProcessorContext<Object, Object> internalProcessorContext = + new InternalMockProcessorContext<>( + new StateSerdes<>("sink", Serdes.ByteArray(), Serdes.ByteArray()), + collector + ); + final ProcessingExceptionHandler processingExceptionHandler = new LogAndContinueProcessingExceptionHandler(); + processingExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "dlq")); + node.init(internalProcessorContext, processingExceptionHandler); + + node.process(new Record<>("hello", "world", 0L)); + + assertEquals(1, collector.collected().size()); + assertEquals("dlq", collector.collected().get(0).topic()); + assertEquals("sourceKey", new String((byte[]) collector.collected().get(0).key())); + assertEquals("sourceValue", new String((byte[]) collector.collected().get(0).value())); + } + private static class ExceptionalProcessor implements Processor<Object, Object, Object, Object> { @Override public void init(final ProcessorContext<Object, Object> context) { @@ -318,6 +378,64 @@ public class ProcessorNodeTest { assertTrue(se.getMessage().contains("pname")); } + @Test + void shouldFailWithDeadLetterQueueRecords() { + final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{}); + final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record); + + final Response response = Response.fail(records); + + assertEquals(Result.FAIL, response.result()); + assertEquals(1, response.deadLetterQueueRecords().size()); + assertEquals(record, response.deadLetterQueueRecords().get(0)); + } + + @Test + void shouldFailWithoutDeadLetterQueueRecords() { + final Response response = Response.fail(); + + assertEquals(Result.FAIL, response.result()); + assertTrue(response.deadLetterQueueRecords().isEmpty()); + } + + @Test + void shouldResumeWithDeadLetterQueueRecords() { + final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{}); + final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record); + + final Response response = Response.resume(records); + + assertEquals(Result.RESUME, response.result()); + assertEquals(1, response.deadLetterQueueRecords().size()); + assertEquals(record, response.deadLetterQueueRecords().get(0)); + } + + @Test + void shouldResumeWithoutDeadLetterQueueRecords() { + final Response response = Response.resume(); + + assertEquals(Result.RESUME, response.result()); + assertTrue(response.deadLetterQueueRecords().isEmpty()); + } + + + @Test + void shouldNotBeModifiable() { + final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{}); + final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record); + + final Response response = Response.fail(records); + + assertThrows(UnsupportedOperationException.class, () -> response.deadLetterQueueRecords().add(record)); + } + + @Test + void shouldReturnsEmptyList() { + final Response response = Response.fail(); + + assertTrue(response.deadLetterQueueRecords().isEmpty()); + } + @SuppressWarnings("unchecked") private InternalProcessorContext<Object, Object> mockInternalProcessorContext() { final InternalProcessorContext<Object, Object> internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT)); @@ -342,12 +460,12 @@ public class ProcessorNodeTest { } public static class ProcessingExceptionHandlerMock implements ProcessingExceptionHandler { - private final ProcessingExceptionHandler.ProcessingHandlerResponse response; + private final Response response; private final InternalProcessorContext<Object, Object> internalProcessorContext; private final boolean shouldThrowException; - public ProcessingExceptionHandlerMock(final ProcessingExceptionHandler.ProcessingHandlerResponse response, + public ProcessingExceptionHandlerMock(final Response response, final InternalProcessorContext<Object, Object> internalProcessorContext, final boolean shouldThrowException) { this.response = response; @@ -356,7 +474,7 @@ public class ProcessorNodeTest { } @Override - public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { + public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { assertEquals(internalProcessorContext.topic(), context.topic()); assertEquals(internalProcessorContext.partition(), context.partition()); assertEquals(internalProcessorContext.offset(), context.offset()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index ebc92c65184..64564278a18 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -47,10 +47,10 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.ProductionExceptionHandler; -import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse; import org.apache.kafka.streams.errors.ProductionExceptionHandler.SerializationExceptionOrigin; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; @@ -89,6 +89,8 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singletonMap; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.errors.ProductionExceptionHandler.Response; +import static org.apache.kafka.streams.errors.ProductionExceptionHandler.Result; import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE; import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.processor.internals.ClientUtils.producerRecordSizeInBytes; @@ -1201,7 +1203,7 @@ public class RecordCollectorTest { logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)), + new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume())), streamsMetrics, topology ); @@ -1228,7 +1230,7 @@ public class RecordCollectorTest { logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)), + new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume())), streamsMetrics, topology ); @@ -1252,7 +1254,7 @@ public class RecordCollectorTest { logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)), + new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume())), streamsMetrics, topology ); @@ -1276,7 +1278,7 @@ public class RecordCollectorTest { taskId, getExceptionalStreamsProducerOnSend(new RuntimeException("KABOOM!")), new ProductionExceptionHandlerMock( - Optional.of(ProductionExceptionHandlerResponse.CONTINUE), + Optional.of(ProductionExceptionHandler.Response.resume()), context, sinkNodeName, taskId @@ -1347,7 +1349,7 @@ public class RecordCollectorTest { taskId, getExceptionalStreamsProducerOnSend(exception), new ProductionExceptionHandlerMock( - Optional.of(ProductionExceptionHandlerResponse.FAIL), + Optional.of(ProductionExceptionHandler.Response.fail()), context, sinkNodeName, taskId @@ -1377,7 +1379,7 @@ public class RecordCollectorTest { taskId, getExceptionalStreamsProducerOnSend(exception), new ProductionExceptionHandlerMock( - Optional.of(ProductionExceptionHandlerResponse.CONTINUE), + Optional.of(ProductionExceptionHandler.Response.resume()), context, sinkNodeName, taskId @@ -1400,7 +1402,7 @@ public class RecordCollectorTest { taskId, getExceptionalStreamsProducerOnSend(exception), new ProductionExceptionHandlerMock( - Optional.of(ProductionExceptionHandlerResponse.RETRY), + Optional.of(ProductionExceptionHandler.Response.retry()), context, sinkNodeName, taskId @@ -1535,7 +1537,7 @@ public class RecordCollectorTest { public void shouldDropRecordExceptionUsingAlwaysContinueExceptionHandler() { try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( - Optional.of(ProductionExceptionHandlerResponse.CONTINUE), + Optional.of(ProductionExceptionHandler.Response.resume()), context, sinkNodeName, taskId, @@ -1564,7 +1566,7 @@ public class RecordCollectorTest { public void shouldThrowStreamsExceptionWhenValueSerializationFailedAndProductionExceptionHandlerRepliesWithFail() { try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( - Optional.of(ProductionExceptionHandlerResponse.FAIL), + Optional.of(ProductionExceptionHandler.Response.fail()), context, sinkNodeName, taskId, @@ -1585,7 +1587,7 @@ public class RecordCollectorTest { public void shouldThrowStreamsExceptionWhenKeySerializationFailedAndProductionExceptionHandlerRepliesWithFail() { try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( - Optional.of(ProductionExceptionHandlerResponse.FAIL), + Optional.of(ProductionExceptionHandler.Response.fail()), context, sinkNodeName, taskId, @@ -1795,7 +1797,7 @@ public class RecordCollectorTest { public void shouldNotCallProductionExceptionHandlerOnClassCastException() { try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { final RecordCollector collector = newRecordCollector( - new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)) + new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume())) ); collector.initialize(); @@ -1834,6 +1836,58 @@ public class RecordCollectorTest { collector.flush(); // need to call flush() to check for internal exceptions } + @Test + public void shouldBuildDeadLetterQueueRecordsInDefaultExceptionHandlerDuringDeserialization() { + try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { + final DefaultProductionExceptionHandler productionExceptionHandler = new DefaultProductionExceptionHandler(); + productionExceptionHandler.configure(Collections.singletonMap( + StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, + "dlq" + )); + final RecordCollector collector = newRecordCollector(productionExceptionHandler); + collector.initialize(); + + assertThat(mockProducer.history().isEmpty(), equalTo(true)); + assertThrows( + StreamsException.class, + () -> + collector.send(topic, "hello", "world", null, 0, null, errorSerializer, stringSerializer, sinkNodeName, context) + ); + + assertEquals(1, mockProducer.history().size()); + assertEquals("dlq", mockProducer.history().get(0).topic()); + } + } + + + @Test + public void shouldBuildDeadLetterQueueRecordsInDefaultExceptionHandler() { + final KafkaException exception = new KafkaException("KABOOM!"); + final StreamsProducer streamProducer = getExceptionalStreamsProducerOnSend(exception); + final MockProducer<byte[], byte[]> mockProducer = (MockProducer<byte[], byte[]>) streamProducer.kafkaProducer(); + final DefaultProductionExceptionHandler productionExceptionHandler = new DefaultProductionExceptionHandler(); + productionExceptionHandler.configure(Collections.singletonMap( + StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, + "dlq" + )); + final RecordCollector collector = new RecordCollectorImpl( + logContext, + taskId, + streamProducer, + productionExceptionHandler, + streamsMetrics, + topology + ); + + collector.initialize(); + + collector.send(topic, "hello", "world", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context); + assertThrows(StreamsException.class, collector::flush); + + assertEquals(1, mockProducer.history().size()); + assertEquals("dlq", mockProducer.history().get(0).topic()); + } + @Test public void shouldNotSendIfSendOfOtherTaskFailedInCallback() { final TaskId taskId1 = new TaskId(0, 0); @@ -1954,6 +2008,116 @@ public class RecordCollectorTest { } } + public void shouldCallOldImplementationExceptionHandler() { + final KafkaException exception = new KafkaException("KABOOM!"); + final StreamsProducer streamProducer = getExceptionalStreamsProducerOnSend(exception); + final OldProductionExceptionHandlerImplementation productionExceptionHandler = new OldProductionExceptionHandlerImplementation(); + + final RecordCollector collector = new RecordCollectorImpl( + logContext, + taskId, + streamProducer, + productionExceptionHandler, + streamsMetrics, + topology + ); + + collector.initialize(); + + collector.send(topic, "hello", "world", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context); + final Exception thrown = assertThrows(StreamsException.class, collector::flush); + + assertEquals(exception, thrown.getCause()); + } + + @Test + public void shouldCallOldImplementationWithRecordContextExceptionHandler() { + final KafkaException exception = new KafkaException("KABOOM!"); + final StreamsProducer streamProducer = getExceptionalStreamsProducerOnSend(exception); + final OldProductionExceptionHandlerWithRecordContextImplementation productionExceptionHandler = new OldProductionExceptionHandlerWithRecordContextImplementation(); + + final RecordCollector collector = new RecordCollectorImpl( + logContext, + taskId, + streamProducer, + productionExceptionHandler, + streamsMetrics, + topology + ); + + collector.initialize(); + + collector.send(topic, "hello", "world", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context); + final Exception thrown = assertThrows(StreamsException.class, collector::flush); + + assertEquals(exception, thrown.getCause()); + } + + @Test + void shouldFailWithDeadLetterQueueRecords() { + final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{}); + final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record); + + final ProductionExceptionHandler.Response response = ProductionExceptionHandler.Response.fail(records); + + assertEquals(Result.FAIL, response.result()); + assertEquals(1, response.deadLetterQueueRecords().size()); + assertEquals(record, response.deadLetterQueueRecords().get(0)); + } + + @Test + void shouldFailWithoutDeadLetterQueueRecords() { + final ProductionExceptionHandler.Response response = ProductionExceptionHandler.Response.fail(); + + assertEquals(Result.FAIL, response.result()); + assertTrue(response.deadLetterQueueRecords().isEmpty()); + } + + @Test + void shouldResumeWithDeadLetterQueueRecords() { + final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{}); + final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record); + + final Response response = Response.resume(records); + + assertEquals(Result.RESUME, response.result()); + assertEquals(1, response.deadLetterQueueRecords().size()); + assertEquals(record, response.deadLetterQueueRecords().get(0)); + } + + @Test + void shouldResumeWithoutDeadLetterQueueRecords() { + final Response response = Response.resume(); + + assertEquals(Result.RESUME, response.result()); + assertTrue(response.deadLetterQueueRecords().isEmpty()); + } + + @Test + void shouldRetryWithoutDeadLetterQueueRecords() { + final Response response = Response.retry(); + + assertEquals(Result.RETRY, response.result()); + assertTrue(response.deadLetterQueueRecords().isEmpty()); + } + + @Test + void shouldNotBeModifiable() { + final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{}); + final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record); + + final Response response = Response.fail(records); + + assertThrows(UnsupportedOperationException.class, () -> response.deadLetterQueueRecords().add(record)); + } + + @Test + void shouldReturnsEmptyList() { + final Response response = Response.fail(); + + assertTrue(response.deadLetterQueueRecords().isEmpty()); + } + private RecordCollector newRecordCollector(final ProductionExceptionHandler productionExceptionHandler) { return new RecordCollectorImpl( logContext, @@ -1978,8 +2142,12 @@ public class RecordCollectorTest { new MockProducer<>(cluster, true, null, byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) { - callback.onCompletion(null, exception); - return null; + if (record.topic().equals("dlq")) { + return super.send(record, callback); + } else { + callback.onCompletion(null, exception); + return null; + } } }, AT_LEAST_ONCE, @@ -2023,7 +2191,7 @@ public class RecordCollectorTest { } public static class ProductionExceptionHandlerMock implements ProductionExceptionHandler { - private final Optional<ProductionExceptionHandlerResponse> response; + private final Optional<Response> response; private boolean shouldThrowException; private InternalProcessorContext<Void, Void> expectedContext; private String expectedProcessorNodeId; @@ -2040,11 +2208,11 @@ public class RecordCollectorTest { this.expectedSerializationExceptionOrigin = null; } - public ProductionExceptionHandlerMock(final Optional<ProductionExceptionHandlerResponse> response) { + public ProductionExceptionHandlerMock(final Optional<Response> response) { this.response = response; } - public ProductionExceptionHandlerMock(final Optional<ProductionExceptionHandlerResponse> response, + public ProductionExceptionHandlerMock(final Optional<Response> response, final InternalProcessorContext<Void, Void> context, final String processorNodeId, final TaskId taskId) { @@ -2064,7 +2232,7 @@ public class RecordCollectorTest { this.shouldThrowException = shouldThrowException; } - public ProductionExceptionHandlerMock(final Optional<ProductionExceptionHandlerResponse> response, + public ProductionExceptionHandlerMock(final Optional<Response> response, final InternalProcessorContext<Void, Void> context, final String processorNodeId, final TaskId taskId, @@ -2075,9 +2243,9 @@ public class RecordCollectorTest { } @Override - public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, - final ProducerRecord<byte[], byte[]> record, - final Exception exception) { + public Response handleError(final ErrorHandlerContext context, + final ProducerRecord<byte[], byte[]> record, + final Exception exception) { assertInputs(context, exception); if (shouldThrowException) { throw new RuntimeException("CRASH"); @@ -2087,10 +2255,10 @@ public class RecordCollectorTest { @SuppressWarnings("rawtypes") @Override - public ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context, - final ProducerRecord record, - final Exception exception, - final SerializationExceptionOrigin origin) { + public Response handleSerializationError(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception, + final SerializationExceptionOrigin origin) { assertInputs(context, exception); assertEquals(expectedSerializationExceptionOrigin, origin); if (shouldThrowException) { @@ -2115,4 +2283,33 @@ public class RecordCollectorTest { assertEquals("KABOOM!", exception.getMessage()); } } + + public static class OldProductionExceptionHandlerImplementation implements ProductionExceptionHandler { + + @SuppressWarnings("deprecation") + @Override + public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record, + final Exception exception) { + return ProductionExceptionHandlerResponse.FAIL; + } + + @Override + public void configure(final Map<String, ?> configs) { + } + } + + public static class OldProductionExceptionHandlerWithRecordContextImplementation implements ProductionExceptionHandler { + + @SuppressWarnings("deprecation") + @Override + public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, + final ProducerRecord<byte[], byte[]> record, + final Exception exception) { + return ProductionExceptionHandlerResponse.FAIL; + } + + @Override + public void configure(final Map<String, ?> configs) { + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java index aa3bb57c7a6..db726d78dcc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java @@ -17,32 +17,44 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; -import org.apache.kafka.streams.errors.DeserializationExceptionHandler.DeserializationHandlerResponse; import org.apache.kafka.streams.errors.ErrorHandlerContext; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockRecordCollector; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; +import static org.apache.kafka.streams.errors.DeserializationExceptionHandler.Response; +import static org.apache.kafka.streams.errors.DeserializationExceptionHandler.Result; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class RecordDeserializerTest { private final String sourceNodeName = "source-node"; @@ -108,7 +120,7 @@ public class RecordDeserializerTest { "value" ), new DeserializationExceptionHandlerMock( - Optional.of(DeserializationHandlerResponse.FAIL), + Optional.of(DeserializationExceptionHandler.Response.fail()), rawRecord, sourceNodeName, taskId @@ -147,7 +159,7 @@ public class RecordDeserializerTest { "value" ), new DeserializationExceptionHandlerMock( - Optional.of(DeserializationHandlerResponse.CONTINUE), + Optional.of(DeserializationExceptionHandler.Response.resume()), rawRecord, sourceNodeName, taskId @@ -188,7 +200,7 @@ public class RecordDeserializerTest { ); assertEquals("Fatal user code error in deserialization error callback", exception.getMessage()); assertInstanceOf(NullPointerException.class, exception.getCause()); - assertEquals("Invalid DeserializationExceptionHandler response.", exception.getCause().getMessage()); + assertEquals("Invalid DeserializationExceptionResponse response.", exception.getCause().getMessage()); } } @@ -222,6 +234,144 @@ public class RecordDeserializerTest { } } + + @Test + public void shouldBuildDeadLetterQueueRecordsInDefaultDeserializationException() { + try (Metrics metrics = new Metrics()) { + final MockRecordCollector collector = new MockRecordCollector(); + final InternalProcessorContext<Object, Object> internalProcessorContext = + new InternalMockProcessorContext<>( + new StateSerdes<>("sink", Serdes.ByteArray(), Serdes.ByteArray()), + collector + ); + final DeserializationExceptionHandler deserializationExceptionHandler = new LogAndFailExceptionHandler(); + deserializationExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "dlq")); + + assertThrows(StreamsException.class, () -> RecordDeserializer.handleDeserializationFailure( + deserializationExceptionHandler, + internalProcessorContext, + new RuntimeException(new NullPointerException("Oopsie")), + new ConsumerRecord<>("source", + 0, + 0, + 123, + TimestampType.CREATE_TIME, + -1, + -1, + "hello".getBytes(StandardCharsets.UTF_8), + "world".getBytes(StandardCharsets.UTF_8), + new RecordHeaders(), + Optional.empty()), + new LogContext().logger(this.getClass()), + metrics.sensor("dropped-records"), + "sourceNode" + )); + + assertEquals(1, collector.collected().size()); + assertEquals("dlq", collector.collected().get(0).topic()); + assertEquals("hello", new String((byte[]) collector.collected().get(0).key())); + assertEquals("world", new String((byte[]) collector.collected().get(0).value())); + } + } + + + @Test + public void shouldBuildDeadLetterQueueRecordsInLogAndContinueDeserializationException() { + try (Metrics metrics = new Metrics()) { + final MockRecordCollector collector = new MockRecordCollector(); + final InternalProcessorContext<Object, Object> internalProcessorContext = + new InternalMockProcessorContext<>( + new StateSerdes<>("sink", Serdes.ByteArray(), Serdes.ByteArray()), + collector + ); + final DeserializationExceptionHandler deserializationExceptionHandler = new LogAndContinueExceptionHandler(); + deserializationExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "dlq")); + + RecordDeserializer.handleDeserializationFailure( + deserializationExceptionHandler, + internalProcessorContext, + new RuntimeException(new NullPointerException("Oopsie")), + new ConsumerRecord<>("source", + 0, + 0, + 123, + TimestampType.CREATE_TIME, + -1, + -1, + "hello".getBytes(StandardCharsets.UTF_8), + "world".getBytes(StandardCharsets.UTF_8), + new RecordHeaders(), + Optional.empty()), + new LogContext().logger(this.getClass()), + metrics.sensor("dropped-records"), + "sourceNode" + ); + + assertEquals(1, collector.collected().size()); + assertEquals("dlq", collector.collected().get(0).topic()); + assertEquals("hello", new String((byte[]) collector.collected().get(0).key())); + assertEquals("world", new String((byte[]) collector.collected().get(0).value())); + } + } + + @Test + void shouldFailWithDeadLetterQueueRecords() { + final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{}); + final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record); + + final Response response = Response.fail(records); + + assertEquals(Result.FAIL, response.result()); + assertEquals(1, response.deadLetterQueueRecords().size()); + assertEquals(record, response.deadLetterQueueRecords().get(0)); + } + + @Test + void shouldFailWithoutDeadLetterQueueRecords() { + final Response response = DeserializationExceptionHandler.Response.fail(); + + assertEquals(Result.FAIL, response.result()); + assertTrue(response.deadLetterQueueRecords().isEmpty()); + } + + @Test + void shouldResumeWithDeadLetterQueueRecords() { + final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{}); + final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record); + + final Response response = Response.resume(records); + + assertEquals(Result.RESUME, response.result()); + assertEquals(1, response.deadLetterQueueRecords().size()); + assertEquals(record, response.deadLetterQueueRecords().get(0)); + } + + @Test + void shouldResumeWithoutDeadLetterQueueRecords() { + final Response response = Response.resume(); + + assertEquals(Result.RESUME, response.result()); + assertTrue(response.deadLetterQueueRecords().isEmpty()); + } + + + @Test + void shouldNotBeModifiable() { + final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{}); + final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record); + + final Response response = Response.fail(records); + + assertThrows(UnsupportedOperationException.class, () -> response.deadLetterQueueRecords().add(record)); + } + + @Test + void shouldReturnsEmptyList() { + final Response response = Response.fail(); + + assertTrue(response.deadLetterQueueRecords().isEmpty()); + } + static class TheSourceNode extends SourceNode<Object, Object> { private final boolean keyThrowsException; private final boolean valueThrowsException; @@ -258,12 +408,12 @@ public class RecordDeserializerTest { } public static class DeserializationExceptionHandlerMock implements DeserializationExceptionHandler { - private final Optional<DeserializationHandlerResponse> response; + private final Optional<Response> response; private final ConsumerRecord<byte[], byte[]> expectedRecord; private final String expectedProcessorNodeId; private final TaskId expectedTaskId; - public DeserializationExceptionHandlerMock(final Optional<DeserializationHandlerResponse> response, + public DeserializationExceptionHandlerMock(final Optional<Response> response, final ConsumerRecord<byte[], byte[]> record, final String processorNodeId, final TaskId taskId) { @@ -274,9 +424,9 @@ public class RecordDeserializerTest { } @Override - public DeserializationHandlerResponse handle(final ErrorHandlerContext context, - final ConsumerRecord<byte[], byte[]> record, - final Exception exception) { + public Response handleError(final ErrorHandlerContext context, + final ConsumerRecord<byte[], byte[]> record, + final Exception exception) { assertEquals(expectedRecord.topic(), context.topic()); assertEquals(expectedRecord.partition(), context.partition()); assertEquals(expectedRecord.offset(), context.offset()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 701f38eda0c..385719530b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -3032,7 +3032,7 @@ public class StreamTaskTest { public static class CrashingProcessingExceptionHandler implements ProcessingExceptionHandler { @Override - public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { + public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { throw new RuntimeException("KABOOM from ProcessingExceptionHandlerMock!"); } @@ -3044,7 +3044,7 @@ public class StreamTaskTest { public static class NullProcessingExceptionHandler implements ProcessingExceptionHandler { @Override - public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { + public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { return null; } diff --git a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java index 8a7f5434963..ed6898b3713 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java +++ b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java @@ -81,6 +81,23 @@ public class MockRecordCollector implements RecordCollector { ); } + @Override + public <K, V> void send(final K key, + final V value, + final String processorNodeId, + final InternalProcessorContext<?, ?> context, + final ProducerRecord<byte[], byte[]> serializedRecord) { + // Building a new ProducerRecord for key & value type conversion + collected.add(new ProducerRecord<>( + serializedRecord.topic(), + serializedRecord.partition(), + serializedRecord.timestamp(), + key, + value, + serializedRecord.headers()) + ); + } + @Override public void initialize() {}