Guozhang Wang commented on KAFKA-6502:

Hello Soby, thanks for reporting this issue. Your reasoning is basically right: 
a record is firstly deserialized and then put into the buffer, then being 
processed. If the deser failed it will not be processed at all. And then if 
there is a series of records having deserialization error, none would be 
processed, i.e. {{StreamTask.process()}} would not be processed at all.

Before we jumped onto possible fixes, could I ask your scenarios: when you see 
the skipped records metric increasing, why do you want to restart your 
applications immediately in the middle of such series of error messages? Would 
you expect there is no valid records ever coming next? My reasoning is that:

1. If these poison pills are due to your application's own serde was buggy, 
then after restarting with the fixed serde these records should be correctly 
processed then, so we are good here.
2. If these poison pills are bad themselves and cannot be processed anyways, 
you would not bother restarting your application; on the other hand you can 
just let your application continue to run and skip these records.  

> Kafka streams deserialization handler not committing offsets on error records
> -----------------------------------------------------------------------------
>                 Key: KAFKA-6502
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6502
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Soby Chacko
>            Priority: Minor
> See this StackOverflow issue: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]
> and this comment: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]
>  I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
> works fine when an error occurs by successfully logging and continuing. 
> However, on a continuous stream of errors, it seems like these messages are 
> not committed and on a restart of the application they reappear again.  It is 
> more problematic if I try to send the messages in error to a DLQ. On a 
> restart, they are sent again to DLQ. As soon as I have a good record coming 
> in, it looks like the offset moves further and not seeing the already logged 
> messages again after a restart. 
> I reproduced this behavior by running the sample provided here: 
> [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]
> I changed the incoming value Serde to 
> {{Serdes.Integer().getClass().getName()}} to force a deserialization error on 
> input and reduced the commit interval to just 1 second. Also added the 
> following to the config.
>  LogAndContinueExceptionHandler.class);}}.
>  It looks like when deserialization exceptions occur, this flag is never set 
> to be true here: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
>  It only becomes true once processing succeeds. That might be the reason why 
> commit is not happening even after I manually call processorContext#commit().

This message was sent by Atlassian JIRA

Reply via email to