Ankur Sinha created KAFKA-20604:
-----------------------------------
Summary: KIP-1348: Complete Built-in Exception Handler Symmetry
with LogAndContinueProductionExceptionHandler
Key: KAFKA-20604
URL: https://issues.apache.org/jira/browse/KAFKA-20604
Project: Kafka
Issue Type: New Feature
Components: streams
Reporter: Ankur Sinha
This ticket tracks the implementation of
[KIP-1348|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1348%3A+Complete+Built-in+Exception+Handler+Symmetry+with+LogAndContinueProductionExceptionHandler].
h3. Problem
Kafka Streams ships built-in log-and-continue exception handlers for
deserialization ({{LogAndContinueExceptionHandler}}) and processing
({{LogAndContinueProcessingExceptionHandler}}), but not for
production/serialization errors. Users who want to skip poison-pill records on
the output side must write a custom {{ProductionExceptionHandler}} — something
not required for the other two error types.
Additionally, DLQ records produced by any exception handler are currently sent
through the same {{RecordCollectorImpl.send()}} path as regular records. With a
continue handler configured, this creates:
* An *infinite loop* in the production path (DLQ failure triggers handler
again, which produces another DLQ record, etc.)
* *Silent data loss* in the deserialization/processing paths (DLQ failure is
swallowed by the continue handler, original record is lost)
h3. Changes
This KIP adds:
# *{{LogAndContinueProductionExceptionHandler}}* — new built-in handler that
logs at WARN level and returns {{Response.resume()}}. Supports DLQ via
{{errors.dead.letter.queue.topic.name}}.
# *{{LogAndFailProductionExceptionHandler}}* — rename of
{{DefaultProductionExceptionHandler}} for naming consistency.
{{DefaultProductionExceptionHandler}} is deprecated as a subclass alias.
# *{{sendDlqRecord()}}* method on {{RecordCollector}} interface — dedicated DLQ
send path that bypasses the production exception handler, preventing the
infinite loop and silent data loss described above. All four DLQ call sites
({{RecordCollectorImpl.handleException()}},
{{RecordCollectorImpl.recordSendError()}},
{{RecordDeserializer.handleDeserializationFailure()}}, and {{StreamTask}}
processing error handler) are updated to use it.
h3. Configuration
No new configuration keys. Users opt in via existing configs:
{code}
production.exception.handler=org.apache.kafka.streams.errors.LogAndContinueProductionExceptionHandler
errors.dead.letter.queue.topic.name=my-app-dlq
{code}
h3. Compatibility
Fully backward compatible. Default behavior unchanged.
{{DefaultProductionExceptionHandler}} remains as a deprecated alias. The
internal default class reference in {{StreamsConfig}} is updated from
{{DefaultProductionExceptionHandler}} to
{{LogAndFailProductionExceptionHandler}} to avoid {{-Werror}} deprecation
compile errors; runtime behavior is identical.
h3. Files Changed
*New files:*
*
{{streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProductionExceptionHandler.java}}
*
{{streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProductionExceptionHandler.java}}
*Modified source:*
*
{{streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java}}
— deprecated, extends LogAndFail
* {{streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java}} —
default class reference updated
*
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java}}
— added {{sendDlqRecord()}}
*
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java}}
— implemented {{sendDlqRecord()}}, updated DLQ send in {{handleException()}}
and {{recordSendError()}}
*
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java}}
— DLQ send updated to {{sendDlqRecord()}}
*
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java}}
— DLQ send updated to {{sendDlqRecord()}}
*Modified tests:*
*
{{streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java}}
— renamed existing DLQ tests, added 2 new LogAndContinue DLQ tests
* {{streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java}} —
added default handler test
* {{streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java}} —
added {{sendDlqRecord()}} mock
*Modified docs:*
* {{docs/streams/upgrade-guide.md}} — added 4.4.0 section
* {{docs/streams/developer-guide/config-streams.md}} — updated handler
references
--
This message was sent by Atlassian Jira
(v8.20.10#820010)