Soby Chacko commented on KAFKA-6502:

Hi Guozhang, your reasonings are correct. However, if I introduce a custom DLQ 
type of exception handler (in which case, if I have a bad Message, it is sent 
to an error-topic), then if I happen to re-start the application, I don't want 
the poison pills to be re-sent to the DLQ as they have already been sent. 
Anyways, this is a corner case I ran into and may not happen frequently. Thanks!

> Kafka streams deserialization handler not committing offsets on error records
> -----------------------------------------------------------------------------
>                 Key: KAFKA-6502
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6502
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Soby Chacko
>            Priority: Minor
> See this StackOverflow issue: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]
> and this comment: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]
>  I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
> works fine when an error occurs by successfully logging and continuing. 
> However, on a continuous stream of errors, it seems like these messages are 
> not committed and on a restart of the application they reappear again.  It is 
> more problematic if I try to send the messages in error to a DLQ. On a 
> restart, they are sent again to DLQ. As soon as I have a good record coming 
> in, it looks like the offset moves further and not seeing the already logged 
> messages again after a restart. 
> I reproduced this behavior by running the sample provided here: 
> [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]
> I changed the incoming value Serde to 
> {{Serdes.Integer().getClass().getName()}} to force a deserialization error on 
> input and reduced the commit interval to just 1 second. Also added the 
> following to the config.
>  LogAndContinueExceptionHandler.class);}}.
>  It looks like when deserialization exceptions occur, this flag is never set 
> to be true here: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
>  It only becomes true once processing succeeds. That might be the reason why 
> commit is not happening even after I manually call processorContext#commit().

This message was sent by Atlassian JIRA

Reply via email to