Laxman created KAFKA-17299:
------------------------------

             Summary: Kafka streams consumer stops consumption
                 Key: KAFKA-17299
                 URL: https://issues.apache.org/jira/browse/KAFKA-17299
             Project: Kafka
          Issue Type: Bug
          Components: consumer, streams
    Affects Versions: 3.7.1, 3.8.0, 3.6.2
            Reporter: Laxman


{code:java}
// code placeholder
{code}
We are using kafka clients version (3.5.2). However, the bug looks to exist in 
current version as well from our code review. 

 

In one of our clusters, kafka streams consumption completely stops and doesn't 
recover even after restart of the consumer instance/pod. These are our 
findings/observations from our debugging.
 * We have some deserialisation errors while the streams consuming. 
 * We configured LogAndContinueExceptionHandler as exception handler to handle 
deserialisation failures.
 * Streams consumption doesn't stop on every deserialisation failure/error. 
 * We are noticing the consumption hangs, only when the first message in the 
current batch is faulty and fails to deserialise.

We did a thorough inspection of the kafka clients code and debugged by patching 
with additional logs, we found the following lines of code from StreamTask.java 
seems to be the issue.

*Original* - Original code 
[StreamTask.java|https://github.com/apache/kafka/blob/3.5.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L750]
{code:java}
            // after processing this record, if its partition queue's buffered 
size has been
            // decreased to the threshold, we can then resume the consumption 
on this partition
            if (recordInfo.queue().size() == maxBufferedSize) {
                mainConsumer.resume(singleton(partition));
            }
{code}
 
*Patched* - Issue resolved after this fix for us.
{code:java}
            // after processing this record, if its partition queue's buffered 
size has been
            // decreased to the threshold, we can then resume the consumption 
on this partition
            if (recordInfo.queue().size() <= maxBufferedSize) {
                mainConsumer.resume(singleton(partition));
            }
{code}
We are resuming consumption only when queue size is exactly matching max 
buffered size. I think some record accounting has gone wrong especially when 
there is an issue with deserialising the first record in the batch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to