This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 9b68b6eb9b5d3700ebdb27cc4da4bc6853594dcd
Author: Laxman Ch <[email protected]>
AuthorDate: Sat Nov 23 08:02:00 2024 +0530

    KAFKA-17299: Fix Kafka Streams consumer hang issue (#17899)
    
    When Kafka Streams skips overs corrupted messages, it might not resume 
previously paused partitions,
    if more than one record is skipped at once, and if the buffer drop below 
the max-buffer limit at the same time.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../java/org/apache/kafka/streams/processor/internals/StreamTask.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f4d0499c327..317c2a3150d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -784,7 +784,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
 
             // after processing this record, if its partition queue's buffered 
size has been
             // decreased to the threshold, we can then resume the consumption 
on this partition
-            if (recordInfo.queue().size() == maxBufferedSize) {
+            if (recordInfo.queue().size() <= maxBufferedSize) {
                 partitionsToResume.add(partition);
             }
 

Reply via email to