This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 36ff5d5c98f KAFKA-14154; Return NOT_CONTROLLER from AlterPartition if
leader is ahead of controller (#12506)
36ff5d5c98f is described below
commit 36ff5d5c98f2649ec2cc5ab302375cd41298a727
Author: Jason Gustafson <[email protected]>
AuthorDate: Thu Aug 11 16:43:12 2022 -0700
KAFKA-14154; Return NOT_CONTROLLER from AlterPartition if leader is ahead
of controller (#12506)
It is possible for the leader to send an `AlterPartition` request to a
zombie controller which includes either a partition or leader epoch which is
larger than what is found in the controller context. Prior to
https://github.com/apache/kafka/pull/12032, the controller handled this in the
following way:
1. If the `LeaderAndIsr` state exactly matches the current state on the
controller excluding the partition epoch, then the `AlterPartition` request is
considered successful and no error is returned. The risk with this handling is
that this may cause the leader to incorrectly assume that the state had been
successfully updated. Since the controller's state is stale, there is no way to
know what the latest ISR state is.
2. Otherwise, the controller will attempt to update the state in zookeeper
with the leader/partition epochs from the `AlterPartition` request. This
operation would fail if the controller's epoch was not still current in
Zookeeper and the result would be a `NOT_CONTROLLER` error.
Following https://github.com/apache/kafka/pull/12032, the controller's
validation is stricter. If the partition epoch is larger than expected, then
the controller will return `INVALID_UPDATE_VERSION` without attempting the
operation. Similarly, if the leader epoch is larger than expected, the
controller will return `FENCED_LEADER_EPOCH`. The problem with this new
handling is that the leader treats the errors from the controller as
authoritative. For example, if it sees the `FENCED_LEA [...]
In this patch, we want to fix the issues with this handling, but we don't
want to restore the buggy idempotent check. The approach is straightforward. If
the controller sees a partition/leader epoch which is larger than what it has
in the controller context, then it assumes that has become a zombie and returns
`NOT_CONTROLLER` to the leader. This will cause the leader to attempt to reset
the controller from its local metadata cache and retry the `AlterPartition`
request.
Reviewers: David Jacot <[email protected]>, José Armando García Sancio
<[email protected]>
---
.../scala/kafka/controller/KafkaController.scala | 9 +++++++-
.../controller/ControllerIntegrationTest.scala | 27 ++++++++++++++++++----
2 files changed, 30 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0154d9cbe54..999bcb818e9 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -2336,7 +2336,14 @@ class KafkaController(val config: KafkaConfig,
controllerContext.partitionLeadershipInfo(tp) match {
case Some(leaderIsrAndControllerEpoch) =>
val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
- if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
+ if (newLeaderAndIsr.partitionEpoch >
currentLeaderAndIsr.partitionEpoch
+ || newLeaderAndIsr.leaderEpoch >
currentLeaderAndIsr.leaderEpoch) {
+ // If the partition leader has a higher partition/leader epoch,
then it is likely
+ // that this node is no longer the active controller. We return
NOT_CONTROLLER in
+ // this case to give the leader an opportunity to find the new
controller.
+ partitionResponses(tp) = Left(Errors.NOT_CONTROLLER)
+ None
+ } else if (newLeaderAndIsr.leaderEpoch !=
currentLeaderAndIsr.leaderEpoch) {
partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
None
} else if
(newLeaderAndIsr.equalsAllowStalePartitionEpoch(currentLeaderAndIsr)) {
diff --git
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 0c8d000656a..532ff1a946e 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -1184,7 +1184,7 @@ class ControllerIntegrationTest extends QuorumTestHarness
{
)
assertAlterPartition(
- partitionError = Errors.INVALID_UPDATE_VERSION,
+ partitionError = Errors.NOT_CONTROLLER,
partitionEpoch = partitionEpoch + 1
)
@@ -1194,7 +1194,7 @@ class ControllerIntegrationTest extends QuorumTestHarness
{
)
assertAlterPartition(
- partitionError = Errors.FENCED_LEADER_EPOCH,
+ partitionError = Errors.NOT_CONTROLLER,
leaderEpoch = leaderEpoch + 1
)
@@ -1218,6 +1218,12 @@ class ControllerIntegrationTest extends
QuorumTestHarness {
partitionEpoch = partitionEpoch - 1
)
+ assertAlterPartition(
+ partitionError = Errors.NOT_CONTROLLER,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING.value,
+ partitionEpoch = partitionEpoch + 1
+ )
+
assertAlterPartition(
partitionError = Errors.FENCED_LEADER_EPOCH,
leaderRecoveryState = LeaderRecoveryState.RECOVERING.value,
@@ -1225,7 +1231,7 @@ class ControllerIntegrationTest extends QuorumTestHarness
{
)
assertAlterPartition(
- partitionError = Errors.FENCED_LEADER_EPOCH,
+ partitionError = Errors.NOT_CONTROLLER,
leaderRecoveryState = LeaderRecoveryState.RECOVERING.value,
leaderEpoch = leaderEpoch + 1
)
@@ -1324,13 +1330,18 @@ class ControllerIntegrationTest extends
QuorumTestHarness {
partitionEpoch = partitionEpoch - 1
)
+ assertAlterPartition(
+ partitionError = Errors.NOT_CONTROLLER,
+ partitionEpoch = partitionEpoch + 1
+ )
+
assertAlterPartition(
partitionError = Errors.FENCED_LEADER_EPOCH,
leaderEpoch = leaderEpoch - 1
)
assertAlterPartition(
- partitionError = Errors.FENCED_LEADER_EPOCH,
+ partitionError = Errors.NOT_CONTROLLER,
leaderEpoch = leaderEpoch + 1
)
@@ -1348,6 +1359,12 @@ class ControllerIntegrationTest extends
QuorumTestHarness {
leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
)
+ assertAlterPartition(
+ partitionError = Errors.NOT_CONTROLLER,
+ partitionEpoch = partitionEpoch + 1,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
+ )
+
assertAlterPartition(
partitionError = Errors.FENCED_LEADER_EPOCH,
leaderEpoch = leaderEpoch - 1,
@@ -1355,7 +1372,7 @@ class ControllerIntegrationTest extends QuorumTestHarness
{
)
assertAlterPartition(
- partitionError = Errors.FENCED_LEADER_EPOCH,
+ partitionError = Errors.NOT_CONTROLLER,
leaderEpoch = leaderEpoch + 1,
leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
)