artemlivshits commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r896208216


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1569,24 +1628,47 @@ class Partition(val topicPartition: TopicPartition,
   ): Boolean = {
     alterPartitionListener.markFailed()
     error match {
-      case Errors.OPERATION_NOT_ATTEMPTED =>
-        // Since the operation was not attempted, it is safe to reset back to 
the committed state.
-        partitionState = CommittedPartitionState(proposedIsrState.isr, 
LeaderRecoveryState.RECOVERED)
-        debug(s"Failed to alter partition to $proposedIsrState since there is 
a pending AlterPartition still inflight. " +
-          s"partition state has been reset to the latest committed state 
$partitionState")
+      case Errors.OPERATION_NOT_ATTEMPTED | Errors.INELIGIBLE_REPLICA =>
+        // Care must be taken when resetting to the last committed state since 
we may not
+        // know in general whether the request was applied or not taking into 
account retries
+        // and controller changes which might have occurred before we received 
the response.
+        // However, when the controller returns INELIGIBLE_REPLICA (or 
OPERATION_NOT_COMMITTED),

Review Comment:
   Comment error code doesn't seem to match the code.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1569,24 +1628,47 @@ class Partition(val topicPartition: TopicPartition,
   ): Boolean = {
     alterPartitionListener.markFailed()
     error match {
-      case Errors.OPERATION_NOT_ATTEMPTED =>
-        // Since the operation was not attempted, it is safe to reset back to 
the committed state.
-        partitionState = CommittedPartitionState(proposedIsrState.isr, 
LeaderRecoveryState.RECOVERED)
-        debug(s"Failed to alter partition to $proposedIsrState since there is 
a pending AlterPartition still inflight. " +
-          s"partition state has been reset to the latest committed state 
$partitionState")
+      case Errors.OPERATION_NOT_ATTEMPTED | Errors.INELIGIBLE_REPLICA =>
+        // Care must be taken when resetting to the last committed state since 
we may not
+        // know in general whether the request was applied or not taking into 
account retries
+        // and controller changes which might have occurred before we received 
the response.
+        // However, when the controller returns INELIGIBLE_REPLICA (or 
OPERATION_NOT_COMMITTED),
+        // the controller is explicitly telling us 1) that the current 
partition epoch is correct,
+        // and 2) that the request was not applied. Even if the controller 
that sent the response
+        // is stale, we are guaranteed from the monotonicity of the controller 
epoch that the
+        // request could not have been applied by any past or future 
controller.
+        partitionState = proposedIsrState.lastCommittedState

Review Comment:
   In KRaft mode, could the state be updated via metadata and applied 
concurrently such that processing this would override a concurrently updated 
last state?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1569,24 +1628,47 @@ class Partition(val topicPartition: TopicPartition,
   ): Boolean = {
     alterPartitionListener.markFailed()
     error match {
-      case Errors.OPERATION_NOT_ATTEMPTED =>
-        // Since the operation was not attempted, it is safe to reset back to 
the committed state.
-        partitionState = CommittedPartitionState(proposedIsrState.isr, 
LeaderRecoveryState.RECOVERED)
-        debug(s"Failed to alter partition to $proposedIsrState since there is 
a pending AlterPartition still inflight. " +
-          s"partition state has been reset to the latest committed state 
$partitionState")
+      case Errors.OPERATION_NOT_ATTEMPTED | Errors.INELIGIBLE_REPLICA =>
+        // Care must be taken when resetting to the last committed state since 
we may not
+        // know in general whether the request was applied or not taking into 
account retries
+        // and controller changes which might have occurred before we received 
the response.
+        // However, when the controller returns INELIGIBLE_REPLICA (or 
OPERATION_NOT_COMMITTED),
+        // the controller is explicitly telling us 1) that the current 
partition epoch is correct,
+        // and 2) that the request was not applied. Even if the controller 
that sent the response
+        // is stale, we are guaranteed from the monotonicity of the controller 
epoch that the
+        // request could not have been applied by any past or future 
controller.
+        partitionState = proposedIsrState.lastCommittedState
+        info(s"Failed to alter partition to $proposedIsrState since the 
controller rejected the request with $error. " +
+          s"Partition state has been reset to the latest committed state 
$partitionState.")
         false
       case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
         debug(s"Failed to alter partition to $proposedIsrState since the 
controller doesn't know about " +
-          "this topic or partition. Giving up.")
+          "this topic or partition. Partition state may be out of thing, 
awaiting new the latest metadata.")

Review Comment:
   out of sync?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -847,21 +861,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   private def needsExpandIsr(followerReplica: Replica): Boolean = {
-    canAddReplicaToIsr(followerReplica.brokerId) && 
isFollowerAtHighwatermark(followerReplica)
+    canAddReplicaToIsr(followerReplica.brokerId) && 
isFollowerInSync(followerReplica)
   }
 
   private def canAddReplicaToIsr(followerReplicaId: Int): Boolean = {
     val current = partitionState
-    !current.isInflight && !current.isr.contains(followerReplicaId)
+    !current.isInflight &&
+      !current.isr.contains(followerReplicaId) &&
+      isReplicaIsrEligible(followerReplicaId)
   }
 
-  private def isFollowerAtHighwatermark(followerReplica: Replica): Boolean = {
+  private def isFollowerInSync(followerReplica: Replica): Boolean = {
     leaderLogIfLocal.exists { leaderLog =>
       val followerEndOffset = followerReplica.stateSnapshot.logEndOffset
       followerEndOffset >= leaderLog.highWatermark && 
leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)
     }
   }
 
+  private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
+    metadataCache match {
+      // In KRaft mode, only replicas which are not fenced nor in controlled 
shutdown are
+      // allowed to join the ISR. This does not apply to ZK mode.
+      case kRaftMetadataCache: KRaftMetadataCache =>
+        !kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
+          !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId)
+
+      case _ => true
+    }

Review Comment:
   Would it be better to encapsulate the KRaft / ZK behavior difference in the 
metadataCache?  Then this function would just call the metadataCache without 
explicit checking the kind of the cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to