Laxman created KAFKA-17299: ------------------------------ Summary: Kafka streams consumer stops consumption Key: KAFKA-17299 URL: https://issues.apache.org/jira/browse/KAFKA-17299 Project: Kafka Issue Type: Bug Components: consumer, streams Affects Versions: 3.7.1, 3.8.0, 3.6.2 Reporter: Laxman
{code:java} // code placeholder {code} We are using kafka clients version (3.5.2). However, the bug looks to exist in current version as well from our code review. In one of our clusters, kafka streams consumption completely stops and doesn't recover even after restart of the consumer instance/pod. These are our findings/observations from our debugging. * We have some deserialisation errors while the streams consuming. * We configured LogAndContinueExceptionHandler as exception handler to handle deserialisation failures. * Streams consumption doesn't stop on every deserialisation failure/error. * We are noticing the consumption hangs, only when the first message in the current batch is faulty and fails to deserialise. We did a thorough inspection of the kafka clients code and debugged by patching with additional logs, we found the following lines of code from StreamTask.java seems to be the issue. *Original* - Original code [StreamTask.java|https://github.com/apache/kafka/blob/3.5.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L750] {code:java} // after processing this record, if its partition queue's buffered size has been // decreased to the threshold, we can then resume the consumption on this partition if (recordInfo.queue().size() == maxBufferedSize) { mainConsumer.resume(singleton(partition)); } {code} *Patched* - Issue resolved after this fix for us. {code:java} // after processing this record, if its partition queue's buffered size has been // decreased to the threshold, we can then resume the consumption on this partition if (recordInfo.queue().size() <= maxBufferedSize) { mainConsumer.resume(singleton(partition)); } {code} We are resuming consumption only when queue size is exactly matching max buffered size. I think some record accounting has gone wrong especially when there is an issue with deserialising the first record in the batch. -- This message was sent by Atlassian Jira (v8.20.10#820010)