[
https://issues.apache.org/jira/browse/KAFKA-9566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax resolved KAFKA-9566.
------------------------------------
Fix Version/s: 3.9.0
Resolution: Fixed
Since 3.9 release, the method taking `ProcessorContext` is deprecated in favor
of a new overload with has a `ErrorHandlerContext` parameter instead.
> ProcessorContextImpl#forward throws NullPointerException if invoked from
> DeserializationExceptionHandler
> --------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-9566
> URL: https://issues.apache.org/jira/browse/KAFKA-9566
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0
> Reporter: Tomas Mi
> Priority: Minor
> Labels: needs-kip
> Fix For: 3.9.0
>
>
> Hi, I am trying to implement custom DeserializationExceptionHandler which
> would forward an exception to downstream processor(s), butÂ
> ProcessorContextImpl#forward throws a NullPointerException if invoked from
> this custom handler.
> Handler implementation:
> {code:title=MyDeserializationExceptionHandler.java}
> public class MyDeserializationExceptionHandler implements
> DeserializationExceptionHandler {
> @Override
> public void configure(Map<String, ?> configs) {
> }
> @Override
> public DeserializationHandlerResponse handle(ProcessorContext context,
> ConsumerRecord<byte[], byte[]> record, Exception exception) {
> context.forward(null, exception, To.child("error-processor"));
> return DeserializationHandlerResponse.CONTINUE;
> }
> }
> {code}
> Handler is wired as default deserialization exception handler:
> {code}
> private TopologyTestDriver initializeTestDriver(StreamsBuilder
> streamBuilder) {
> Topology topology = streamBuilder.build();
> Properties props = new Properties();
> props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
> "my-test-application");
> props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "dummy:1234");
> props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> StreamsConfig.EXACTLY_ONCE);
>
> props.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
> MyDeserializationExceptionHandler.class.getName());
> return new TopologyTestDriver(topology, props);
> }
> {code}
> Â
> Exception stacktrace:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: Fatal user code error in
> deserialization error callback
> at
> org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:76)
> at
> org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
> at
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
> at
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742)
> at
> org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:392)
> ...
> Caused by: java.lang.NullPointerException
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:165)
> at
> MyDeserializationExceptionHandler.handle(NewExceptionHandlerTest.java:204)
> at
> org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:70)
> ... 33 more
> {noformat}
> Neither DeserializationExceptionHandler, nor ProcessorContext javadocs
> mention that ProcessorContext#forward(...) must not be invoked from
> DeserializationExceptionHandler, so I assume that this is a defect.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)