artemlivshits commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r896208216
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1569,24 +1628,47 @@ class Partition(val topicPartition: TopicPartition,
): Boolean = {
alterPartitionListener.markFailed()
error match {
- case Errors.OPERATION_NOT_ATTEMPTED =>
- // Since the operation was not attempted, it is safe to reset back to
the committed state.
- partitionState = CommittedPartitionState(proposedIsrState.isr,
LeaderRecoveryState.RECOVERED)
- debug(s"Failed to alter partition to $proposedIsrState since there is
a pending AlterPartition still inflight. " +
- s"partition state has been reset to the latest committed state
$partitionState")
+ case Errors.OPERATION_NOT_ATTEMPTED | Errors.INELIGIBLE_REPLICA =>
+ // Care must be taken when resetting to the last committed state since
we may not
+ // know in general whether the request was applied or not taking into
account retries
+ // and controller changes which might have occurred before we received
the response.
+ // However, when the controller returns INELIGIBLE_REPLICA (or
OPERATION_NOT_COMMITTED),
Review Comment:
Comment error code doesn't seem to match the code.
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1569,24 +1628,47 @@ class Partition(val topicPartition: TopicPartition,
): Boolean = {
alterPartitionListener.markFailed()
error match {
- case Errors.OPERATION_NOT_ATTEMPTED =>
- // Since the operation was not attempted, it is safe to reset back to
the committed state.
- partitionState = CommittedPartitionState(proposedIsrState.isr,
LeaderRecoveryState.RECOVERED)
- debug(s"Failed to alter partition to $proposedIsrState since there is
a pending AlterPartition still inflight. " +
- s"partition state has been reset to the latest committed state
$partitionState")
+ case Errors.OPERATION_NOT_ATTEMPTED | Errors.INELIGIBLE_REPLICA =>
+ // Care must be taken when resetting to the last committed state since
we may not
+ // know in general whether the request was applied or not taking into
account retries
+ // and controller changes which might have occurred before we received
the response.
+ // However, when the controller returns INELIGIBLE_REPLICA (or
OPERATION_NOT_COMMITTED),
+ // the controller is explicitly telling us 1) that the current
partition epoch is correct,
+ // and 2) that the request was not applied. Even if the controller
that sent the response
+ // is stale, we are guaranteed from the monotonicity of the controller
epoch that the
+ // request could not have been applied by any past or future
controller.
+ partitionState = proposedIsrState.lastCommittedState
Review Comment:
In KRaft mode, could the state be updated via metadata and applied
concurrently such that processing this would override a concurrently updated
last state?
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1569,24 +1628,47 @@ class Partition(val topicPartition: TopicPartition,
): Boolean = {
alterPartitionListener.markFailed()
error match {
- case Errors.OPERATION_NOT_ATTEMPTED =>
- // Since the operation was not attempted, it is safe to reset back to
the committed state.
- partitionState = CommittedPartitionState(proposedIsrState.isr,
LeaderRecoveryState.RECOVERED)
- debug(s"Failed to alter partition to $proposedIsrState since there is
a pending AlterPartition still inflight. " +
- s"partition state has been reset to the latest committed state
$partitionState")
+ case Errors.OPERATION_NOT_ATTEMPTED | Errors.INELIGIBLE_REPLICA =>
+ // Care must be taken when resetting to the last committed state since
we may not
+ // know in general whether the request was applied or not taking into
account retries
+ // and controller changes which might have occurred before we received
the response.
+ // However, when the controller returns INELIGIBLE_REPLICA (or
OPERATION_NOT_COMMITTED),
+ // the controller is explicitly telling us 1) that the current
partition epoch is correct,
+ // and 2) that the request was not applied. Even if the controller
that sent the response
+ // is stale, we are guaranteed from the monotonicity of the controller
epoch that the
+ // request could not have been applied by any past or future
controller.
+ partitionState = proposedIsrState.lastCommittedState
+ info(s"Failed to alter partition to $proposedIsrState since the
controller rejected the request with $error. " +
+ s"Partition state has been reset to the latest committed state
$partitionState.")
false
case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
debug(s"Failed to alter partition to $proposedIsrState since the
controller doesn't know about " +
- "this topic or partition. Giving up.")
+ "this topic or partition. Partition state may be out of thing,
awaiting new the latest metadata.")
Review Comment:
out of sync?
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -847,21 +861,35 @@ class Partition(val topicPartition: TopicPartition,
}
private def needsExpandIsr(followerReplica: Replica): Boolean = {
- canAddReplicaToIsr(followerReplica.brokerId) &&
isFollowerAtHighwatermark(followerReplica)
+ canAddReplicaToIsr(followerReplica.brokerId) &&
isFollowerInSync(followerReplica)
}
private def canAddReplicaToIsr(followerReplicaId: Int): Boolean = {
val current = partitionState
- !current.isInflight && !current.isr.contains(followerReplicaId)
+ !current.isInflight &&
+ !current.isr.contains(followerReplicaId) &&
+ isReplicaIsrEligible(followerReplicaId)
}
- private def isFollowerAtHighwatermark(followerReplica: Replica): Boolean = {
+ private def isFollowerInSync(followerReplica: Replica): Boolean = {
leaderLogIfLocal.exists { leaderLog =>
val followerEndOffset = followerReplica.stateSnapshot.logEndOffset
followerEndOffset >= leaderLog.highWatermark &&
leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)
}
}
+ private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
+ metadataCache match {
+ // In KRaft mode, only replicas which are not fenced nor in controlled
shutdown are
+ // allowed to join the ISR. This does not apply to ZK mode.
+ case kRaftMetadataCache: KRaftMetadataCache =>
+ !kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
+ !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId)
+
+ case _ => true
+ }
Review Comment:
Would it be better to encapsulate the KRaft / ZK behavior difference in the
metadataCache? Then this function would just call the metadataCache without
explicit checking the kind of the cache.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]