This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new 01fe2d4 KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (#6202) 01fe2d4 is described below commit 01fe2d48eae8b8a47a90ab742d4c0b7fd07e4ce4 Author: Dhruvil Shah <dhru...@confluent.io> AuthorDate: Fri Jan 25 19:40:50 2019 -0800 KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (#6202) Check if a partition is offline while iterating all partitions. Reviewers: Jun Rao <jun...@gmail.com> (cherry picked from commit 646ec948794c927e4ffa5f96d60b5b9f7fe8f228) --- core/src/main/scala/kafka/server/ReplicaManager.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 84b2d48..efdde13 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -400,9 +400,13 @@ class ReplicaManager(val config: KafkaConfig, def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] = getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition) + // An iterator over all non offline partitions. This is a weakly consistent iterator; a partition made offline after + // the iterator has been constructed could still be returned by this iterator. private def nonOfflinePartitionsIterator: Iterator[Partition] = allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition) + // An iterator over all offline partitions. This is a weakly consistent iterator; a partition made offline after the + // iterator has been constructed may not be visible. private def offlinePartitionsIterator: Iterator[Partition] = allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition) @@ -1339,7 +1343,11 @@ class ReplicaManager(val config: KafkaConfig, private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") - nonOfflinePartitionsIterator.foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs)) + + // Shrink ISRs for non offline partitions + allPartitions.keys.foreach { topicPartition => + nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs)) + } } /**