This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 01fe2d4  KAFKA-7837: Ensure offline partitions are picked up as soon 
as possible when shrinking ISR (#6202)
01fe2d4 is described below

commit 01fe2d48eae8b8a47a90ab742d4c0b7fd07e4ce4
Author: Dhruvil Shah <dhru...@confluent.io>
AuthorDate: Fri Jan 25 19:40:50 2019 -0800

    KAFKA-7837: Ensure offline partitions are picked up as soon as possible 
when shrinking ISR (#6202)
    
    Check if a partition is offline while iterating all partitions.
    
    Reviewers: Jun Rao <jun...@gmail.com>
    (cherry picked from commit 646ec948794c927e4ffa5f96d60b5b9f7fe8f228)
---
 core/src/main/scala/kafka/server/ReplicaManager.scala | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 84b2d48..efdde13 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -400,9 +400,13 @@ class ReplicaManager(val config: KafkaConfig,
   def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] =
     getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition)
 
+  // An iterator over all non offline partitions. This is a weakly consistent 
iterator; a partition made offline after
+  // the iterator has been constructed could still be returned by this 
iterator.
   private def nonOfflinePartitionsIterator: Iterator[Partition] =
     allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition)
 
+  // An iterator over all offline partitions. This is a weakly consistent 
iterator; a partition made offline after the
+  // iterator has been constructed may not be visible.
   private def offlinePartitionsIterator: Iterator[Partition] =
     allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition)
 
@@ -1339,7 +1343,11 @@ class ReplicaManager(val config: KafkaConfig,
 
   private def maybeShrinkIsr(): Unit = {
     trace("Evaluating ISR list of partitions to see which replicas can be 
removed from the ISR")
-    
nonOfflinePartitionsIterator.foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs))
+
+    // Shrink ISRs for non offline partitions
+    allPartitions.keys.foreach { topicPartition =>
+      
nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs))
+    }
   }
 
   /**

Reply via email to