dajac commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r896462892


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -847,21 +861,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   private def needsExpandIsr(followerReplica: Replica): Boolean = {
-    canAddReplicaToIsr(followerReplica.brokerId) && 
isFollowerAtHighwatermark(followerReplica)
+    canAddReplicaToIsr(followerReplica.brokerId) && 
isFollowerInSync(followerReplica)
   }
 
   private def canAddReplicaToIsr(followerReplicaId: Int): Boolean = {
     val current = partitionState
-    !current.isInflight && !current.isr.contains(followerReplicaId)
+    !current.isInflight &&
+      !current.isr.contains(followerReplicaId) &&
+      isReplicaIsrEligible(followerReplicaId)
   }
 
-  private def isFollowerAtHighwatermark(followerReplica: Replica): Boolean = {
+  private def isFollowerInSync(followerReplica: Replica): Boolean = {
     leaderLogIfLocal.exists { leaderLog =>
       val followerEndOffset = followerReplica.stateSnapshot.logEndOffset
       followerEndOffset >= leaderLog.highWatermark && 
leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)
     }
   }
 
+  private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
+    metadataCache match {
+      // In KRaft mode, only replicas which are not fenced nor in controlled 
shutdown are
+      // allowed to join the ISR. This does not apply to ZK mode.
+      case kRaftMetadataCache: KRaftMetadataCache =>
+        !kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
+          !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId)
+
+      case _ => true
+    }

Review Comment:
   This is what I did originally but we moved to this solution based on 
reviewers' feedback. The rational of doing it here is that ZK does not have 
such information so reviewers felt like having methods, in the ZK metadata 
cache, which are not implemented could be misleading. Personally, I am fine 
either ways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to