[
https://issues.apache.org/jira/browse/KAFKA-14131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Justinwins updated KAFKA-14131:
-------------------------------
Description:
When a herder starts ,its KafkaOffsetBackingStore will readToLogEnd() by
DistributedHerder.herderExecutor of name "Distrubuted-connect-" thread , e.g .
Distrubuted-connect-28-1 , which may consume a few minutes.
If another thread tries to shut down this herder , it will block for
"task.shutdown.graceful.timeout.ms ' before the
DistributedHerder.herderExecutor is interrupted.
And if thread in DistributedHerder.herderExecutor is interupted,
KafkaOffsetBackingStore.readToLogEnd() will poll(Integer.MAX_VALUE) and log "
Error polling" as the the read has been interrupted, then "consumer.position"
will not advance, readToLogEnd() falls into infinite loop.
{code:java}
// code placeholder
private void readToLogEnd() {
Set<TopicPartition> assignment = consumer.assignment();
Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
log.trace("Reading to end of log offsets {}", endOffsets);
while (!endOffsets.isEmpty()) { // this loop will never jump out
Iterator<Map.Entry<TopicPartition, Long>> it =
endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
TopicPartition topicPartition = entry.getKey();
long endOffset = entry.getValue();
long lastConsumedOffset = consumer.position(topicPartition); //
when thread was in interupted status ,consumer.position will not advance
if (lastConsumedOffset >= endOffset) {
log.trace("Read to end offset {} for {}", endOffset,
topicPartition);
it.remove();
} else {
log.trace("Behind end offset {} for {}; last-read offset is {}",
endOffset, topicPartition, lastConsumedOffset);
poll(Integer.MAX_VALUE); // here , poll() will catch
InterruptedException and log it without throwing it up
break;
}
}
}
} {code}
was:
When a herder starts ,its KafkaOffsetBackingStore will readToLogEnd() by
DistributedHerder.herderExecutor of name "Distrubuted-connect-" thread , e.g .
Distrubuted-connect-28-1 , which may consume a few minutes.
If another thread tries to shut down this herder , it will block for
"task.shutdown.graceful.timeout.ms ' before the
DistributedHerder.herderExecutor is interrupted.
And if thread in DistributedHerder.herderExecutor is interupted,
KafkaOffsetBackingStore.readToLogEnd() will poll(Integer.MAX_VALUE) and log "
Error polling" as the the read has been interrupted, then "consumer.position"
will not advance, readToLogEnd() falls into infinite loop.
```
private void readToLogEnd() {
Set<TopicPartition> assignment = consumer.assignment();
Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
log.trace("Reading to end of log offsets {}", endOffsets);
while (!endOffsets.isEmpty()) { // this loop will never jump out
Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
TopicPartition topicPartition = entry.getKey();
long endOffset = entry.getValue();
long lastConsumedOffset = consumer.position(topicPartition); // when thread was
in interupted status ,consumer.position will not advance
if (lastConsumedOffset >= endOffset) {
log.trace("Read to end offset {} for {}", endOffset, topicPartition);
it.remove();
} else {
log.trace("Behind end offset {} for {}; last-read offset is {}",
endOffset, topicPartition, lastConsumedOffset);
poll(Integer.MAX_VALUE); // here , poll() will catch InterruptedException and
log it without throwing it up
break;
}
}
}
}
```
> KafkaBasedLog#readToLogEnd() may accciedently falls into infinite loop
> ----------------------------------------------------------------------
>
> Key: KAFKA-14131
> URL: https://issues.apache.org/jira/browse/KAFKA-14131
> Project: Kafka
> Issue Type: Bug
> Components: mirrormaker
> Reporter: Justinwins
> Priority: Major
>
> When a herder starts ,its KafkaOffsetBackingStore will readToLogEnd() by
> DistributedHerder.herderExecutor of name "Distrubuted-connect-" thread , e.g
> . Distrubuted-connect-28-1 , which may consume a few minutes.
> If another thread tries to shut down this herder , it will block for
> "task.shutdown.graceful.timeout.ms ' before the
> DistributedHerder.herderExecutor is interrupted.
> And if thread in DistributedHerder.herderExecutor is interupted,
> KafkaOffsetBackingStore.readToLogEnd() will poll(Integer.MAX_VALUE) and log "
> Error polling" as the the read has been interrupted, then
> "consumer.position" will not advance, readToLogEnd() falls into infinite loop.
>
> {code:java}
> // code placeholder
> private void readToLogEnd() {
> Set<TopicPartition> assignment = consumer.assignment();
> Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
> log.trace("Reading to end of log offsets {}", endOffsets);
> while (!endOffsets.isEmpty()) { // this loop will never jump out
> Iterator<Map.Entry<TopicPartition, Long>> it =
> endOffsets.entrySet().iterator();
> while (it.hasNext()) {
> Map.Entry<TopicPartition, Long> entry = it.next();
> TopicPartition topicPartition = entry.getKey();
> long endOffset = entry.getValue();
> long lastConsumedOffset = consumer.position(topicPartition); //
> when thread was in interupted status ,consumer.position will not advance
> if (lastConsumedOffset >= endOffset) {
> log.trace("Read to end offset {} for {}", endOffset,
> topicPartition);
> it.remove();
> } else {
> log.trace("Behind end offset {} for {}; last-read offset is
> {}",
> endOffset, topicPartition, lastConsumedOffset);
> poll(Integer.MAX_VALUE); // here , poll() will catch
> InterruptedException and log it without throwing it up
> break;
> }
> }
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)