[ 
https://issues.apache.org/jira/browse/KAFKA-14131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17575390#comment-17575390
 ] 

Sagar Rao edited comment on KAFKA-14131 at 8/4/22 6:20 PM:
-----------------------------------------------------------

[~Justinwins] i created a PR here => 
[https://github.com/apache/kafka/pull/12485.] I guess handling 
InterruptedException and throwing should make the hung consumer problem go 
away. WDYT?


was (Author: sagarrao):
[~Justinwins] i created a PR here => 
[https://github.com/apache/kafka/pull/12485.] It's pretty small...

> KafkaBasedLog#readToLogEnd() may accciedently falls into infinite loop
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-14131
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14131
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>            Reporter: Justinwins
>            Assignee: Sagar Rao
>            Priority: Major
>
> When a herder starts ,its KafkaOffsetBackingStore will readToLogEnd() by
> DistributedHerder.herderExecutor of name "Distrubuted-connect-"  thread , e.g 
> . Distrubuted-connect-28-1 , which may consume  a few minutes.
> If another thread tries to shut down this herder , it will block for 
> "task.shutdown.graceful.timeout.ms ' before  the 
> DistributedHerder.herderExecutor is interrupted. 
> And if thread in DistributedHerder.herderExecutor is interupted, 
> KafkaOffsetBackingStore.readToLogEnd() will  poll(Integer.MAX_VALUE) and log "
> Error polling" as  the the read  has been interrupted, then 
> "consumer.position" will not advance, readToLogEnd() falls into infinite loop.
>  
> {code:java}
> // code placeholder
> private void readToLogEnd() {
>     Set<TopicPartition> assignment = consumer.assignment();
>     Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
>     log.trace("Reading to end of log offsets {}", endOffsets);
>     while (!endOffsets.isEmpty()) { // this loop will never jump out
>         Iterator<Map.Entry<TopicPartition, Long>> it = 
> endOffsets.entrySet().iterator();
>         while (it.hasNext()) {
>             Map.Entry<TopicPartition, Long> entry = it.next();
>             TopicPartition topicPartition = entry.getKey();
>             long endOffset = entry.getValue();
>             long lastConsumedOffset = consumer.position(topicPartition);  // 
> when thread was in interupted status ,consumer.position will not advance
>             if (lastConsumedOffset >= endOffset) {
>                 log.trace("Read to end offset {} for {}", endOffset, 
> topicPartition);
>                 it.remove();
>             } else {
>                 log.trace("Behind end offset {} for {}; last-read offset is 
> {}",
>                         endOffset, topicPartition, lastConsumedOffset);
>                 poll(Integer.MAX_VALUE); // here , poll() will catch 
> InterruptedException and log it without throwing it up
>                 break;
>             }
>         }
>     }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to