Justinwins created KAFKA-14131:
----------------------------------

             Summary: KafkaBasedLog#readToLogEnd() may accciedently falls into 
infinite loop
                 Key: KAFKA-14131
                 URL: https://issues.apache.org/jira/browse/KAFKA-14131
             Project: Kafka
          Issue Type: Bug
          Components: mirrormaker
            Reporter: Justinwins


When a herder starts ,its KafkaOffsetBackingStore will readToLogEnd() by

DistributedHerder.herderExecutor of name "Distrubuted-connect-"  thread , e.g . 
Distrubuted-connect-28-1 , which may consume  a few minutes.

If another thread tries to shut down this herder , it will block for 
"task.shutdown.graceful.timeout.ms ' before  the 
DistributedHerder.herderExecutor is interrupted. 

And if thread in DistributedHerder.herderExecutor is interupted, 
KafkaOffsetBackingStore.readToLogEnd() will  poll(Integer.MAX_VALUE) and log "

Error polling" as  the the read  has been interrupted, then "consumer.position" 
will not advance, readToLogEnd() falls into infinite loop.

 

```

private void readToLogEnd() {
Set<TopicPartition> assignment = consumer.assignment();
Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
log.trace("Reading to end of log offsets {}", endOffsets);

while (!endOffsets.isEmpty()) { // this loop will never jump out 
Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
TopicPartition topicPartition = entry.getKey();
long endOffset = entry.getValue();
long lastConsumedOffset = consumer.position(topicPartition); // when thread was 
in interupted status ,consumer.position will not advance
if (lastConsumedOffset >= endOffset) {
log.trace("Read to end offset {} for {}", endOffset, topicPartition);
it.remove();
} else {
log.trace("Behind end offset {} for {}; last-read offset is {}",
endOffset, topicPartition, lastConsumedOffset);
poll(Integer.MAX_VALUE); // here , poll() will catch InterruptedException and 
log it without throwing it up
break;
}
}
}
}

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to