This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 597130c016 MINOR: Improve readability of `tryProcessAlterPartition`
(#12515)
597130c016 is described below
commit 597130c01659f4f19286f2e9404f97c5ecde3ab2
Author: Ismael Juma <[email protected]>
AuthorDate: Mon Aug 15 15:54:13 2022 -0700
MINOR: Improve readability of `tryProcessAlterPartition` (#12515)
After 520f72995d, the subsequent checks are ensuring that
the leader and partition epochs are not less than. So,
make that explicit.
Reviewers: Jason Gustafson <[email protected]>
---
core/src/main/scala/kafka/controller/KafkaController.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 999bcb818e..b4a205fb66 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -2343,7 +2343,7 @@ class KafkaController(val config: KafkaConfig,
// this case to give the leader an opportunity to find the new
controller.
partitionResponses(tp) = Left(Errors.NOT_CONTROLLER)
None
- } else if (newLeaderAndIsr.leaderEpoch !=
currentLeaderAndIsr.leaderEpoch) {
+ } else if (newLeaderAndIsr.leaderEpoch <
currentLeaderAndIsr.leaderEpoch) {
partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
None
} else if
(newLeaderAndIsr.equalsAllowStalePartitionEpoch(currentLeaderAndIsr)) {
@@ -2351,7 +2351,7 @@ class KafkaController(val config: KafkaConfig,
// this check must be done before fencing based on partition epoch
to maintain idempotency
partitionResponses(tp) = Right(currentLeaderAndIsr)
None
- } else if (newLeaderAndIsr.partitionEpoch !=
currentLeaderAndIsr.partitionEpoch) {
+ } else if (newLeaderAndIsr.partitionEpoch <
currentLeaderAndIsr.partitionEpoch) {
partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
None
} else if (newLeaderAndIsr.leaderRecoveryState ==
LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {