[
https://issues.apache.org/jira/browse/KAFKA-14131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17575390#comment-17575390
]
Sagar Rao edited comment on KAFKA-14131 at 8/4/22 6:20 PM:
-----------------------------------------------------------
[~Justinwins] i created a PR here =>
[https://github.com/apache/kafka/pull/12485.] I guess handling
InterruptedException and throwing should make the hung consumer problem go
away. WDYT?
was (Author: sagarrao):
[~Justinwins] i created a PR here =>
[https://github.com/apache/kafka/pull/12485.] It's pretty small...
> KafkaBasedLog#readToLogEnd() may accciedently falls into infinite loop
> ----------------------------------------------------------------------
>
> Key: KAFKA-14131
> URL: https://issues.apache.org/jira/browse/KAFKA-14131
> Project: Kafka
> Issue Type: Bug
> Components: mirrormaker
> Reporter: Justinwins
> Assignee: Sagar Rao
> Priority: Major
>
> When a herder starts ,its KafkaOffsetBackingStore will readToLogEnd() by
> DistributedHerder.herderExecutor of name "Distrubuted-connect-" thread , e.g
> . Distrubuted-connect-28-1 , which may consume a few minutes.
> If another thread tries to shut down this herder , it will block for
> "task.shutdown.graceful.timeout.ms ' before the
> DistributedHerder.herderExecutor is interrupted.
> And if thread in DistributedHerder.herderExecutor is interupted,
> KafkaOffsetBackingStore.readToLogEnd() will poll(Integer.MAX_VALUE) and log "
> Error polling" as the the read has been interrupted, then
> "consumer.position" will not advance, readToLogEnd() falls into infinite loop.
>
> {code:java}
> // code placeholder
> private void readToLogEnd() {
> Set<TopicPartition> assignment = consumer.assignment();
> Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
> log.trace("Reading to end of log offsets {}", endOffsets);
> while (!endOffsets.isEmpty()) { // this loop will never jump out
> Iterator<Map.Entry<TopicPartition, Long>> it =
> endOffsets.entrySet().iterator();
> while (it.hasNext()) {
> Map.Entry<TopicPartition, Long> entry = it.next();
> TopicPartition topicPartition = entry.getKey();
> long endOffset = entry.getValue();
> long lastConsumedOffset = consumer.position(topicPartition); //
> when thread was in interupted status ,consumer.position will not advance
> if (lastConsumedOffset >= endOffset) {
> log.trace("Read to end offset {} for {}", endOffset,
> topicPartition);
> it.remove();
> } else {
> log.trace("Behind end offset {} for {}; last-read offset is
> {}",
> endOffset, topicPartition, lastConsumedOffset);
> poll(Integer.MAX_VALUE); // here , poll() will catch
> InterruptedException and log it without throwing it up
> break;
> }
> }
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)