This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 520f72995dd KAFKA-14154; Return NOT_CONTROLLER from AlterPartition if 
leader is ahead of controller (#12506)
520f72995dd is described below

commit 520f72995ddc5c687e364b83c7ace45f14fdd701
Author: Jason Gustafson <[email protected]>
AuthorDate: Thu Aug 11 16:43:12 2022 -0700

    KAFKA-14154; Return NOT_CONTROLLER from AlterPartition if leader is ahead 
of controller (#12506)
    
    It is possible for the leader to send an `AlterPartition` request to a 
zombie controller which includes either a partition or leader epoch which is 
larger than what is found in the controller context. Prior to 
https://github.com/apache/kafka/pull/12032, the controller handled this in the 
following way:
    
    1. If the `LeaderAndIsr` state exactly matches the current state on the 
controller excluding the partition epoch, then the `AlterPartition` request is 
considered successful and no error is returned. The risk with this handling is 
that this may cause the leader to incorrectly assume that the state had been 
successfully updated. Since the controller's state is stale, there is no way to 
know what the latest ISR state is.
    2. Otherwise, the controller will attempt to update the state in zookeeper 
with the leader/partition epochs from the `AlterPartition` request. This 
operation would fail if the controller's epoch was not still current in 
Zookeeper and the result would be a `NOT_CONTROLLER` error.
    
    Following https://github.com/apache/kafka/pull/12032, the controller's 
validation is stricter. If the partition epoch is larger than expected, then 
the controller will return `INVALID_UPDATE_VERSION` without attempting the 
operation. Similarly, if the leader epoch is larger than expected, the 
controller will return `FENCED_LEADER_EPOCH`. The problem with this new 
handling is that the leader treats the errors from the controller as 
authoritative. For example, if it sees the `FENCED_LEA [...]
    
    In this patch, we want to fix the issues with this handling, but we don't 
want to restore the buggy idempotent check. The approach is straightforward. If 
the controller sees a partition/leader epoch which is larger than what it has 
in the controller context, then it assumes that has become a zombie and returns 
`NOT_CONTROLLER` to the leader. This will cause the leader to attempt to reset 
the controller from its local metadata cache and retry the `AlterPartition` 
request.
    
    Reviewers: David Jacot <[email protected]>, José Armando García Sancio 
<[email protected]>
---
 .../scala/kafka/controller/KafkaController.scala   |  9 +++++++-
 .../controller/ControllerIntegrationTest.scala     | 27 ++++++++++++++++++----
 2 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0154d9cbe54..999bcb818e9 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -2336,7 +2336,14 @@ class KafkaController(val config: KafkaConfig,
       controllerContext.partitionLeadershipInfo(tp) match {
         case Some(leaderIsrAndControllerEpoch) =>
           val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-          if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
+          if (newLeaderAndIsr.partitionEpoch > 
currentLeaderAndIsr.partitionEpoch
+              || newLeaderAndIsr.leaderEpoch > 
currentLeaderAndIsr.leaderEpoch) {
+            // If the partition leader has a higher partition/leader epoch, 
then it is likely
+            // that this node is no longer the active controller. We return 
NOT_CONTROLLER in
+            // this case to give the leader an opportunity to find the new 
controller.
+            partitionResponses(tp) = Left(Errors.NOT_CONTROLLER)
+            None
+          } else if (newLeaderAndIsr.leaderEpoch != 
currentLeaderAndIsr.leaderEpoch) {
             partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
             None
           } else if 
(newLeaderAndIsr.equalsAllowStalePartitionEpoch(currentLeaderAndIsr)) {
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 0c8d000656a..532ff1a946e 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -1184,7 +1184,7 @@ class ControllerIntegrationTest extends QuorumTestHarness 
{
     )
 
     assertAlterPartition(
-      partitionError = Errors.INVALID_UPDATE_VERSION,
+      partitionError = Errors.NOT_CONTROLLER,
       partitionEpoch = partitionEpoch + 1
     )
 
@@ -1194,7 +1194,7 @@ class ControllerIntegrationTest extends QuorumTestHarness 
{
     )
 
     assertAlterPartition(
-      partitionError = Errors.FENCED_LEADER_EPOCH,
+      partitionError = Errors.NOT_CONTROLLER,
       leaderEpoch = leaderEpoch + 1
     )
 
@@ -1218,6 +1218,12 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
       partitionEpoch = partitionEpoch - 1
     )
 
+    assertAlterPartition(
+      partitionError = Errors.NOT_CONTROLLER,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING.value,
+      partitionEpoch = partitionEpoch + 1
+    )
+
     assertAlterPartition(
       partitionError = Errors.FENCED_LEADER_EPOCH,
       leaderRecoveryState = LeaderRecoveryState.RECOVERING.value,
@@ -1225,7 +1231,7 @@ class ControllerIntegrationTest extends QuorumTestHarness 
{
     )
 
     assertAlterPartition(
-      partitionError = Errors.FENCED_LEADER_EPOCH,
+      partitionError = Errors.NOT_CONTROLLER,
       leaderRecoveryState = LeaderRecoveryState.RECOVERING.value,
       leaderEpoch = leaderEpoch + 1
     )
@@ -1324,13 +1330,18 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
       partitionEpoch = partitionEpoch - 1
     )
 
+    assertAlterPartition(
+      partitionError = Errors.NOT_CONTROLLER,
+      partitionEpoch = partitionEpoch + 1
+    )
+
     assertAlterPartition(
       partitionError = Errors.FENCED_LEADER_EPOCH,
       leaderEpoch = leaderEpoch - 1
     )
 
     assertAlterPartition(
-      partitionError = Errors.FENCED_LEADER_EPOCH,
+      partitionError = Errors.NOT_CONTROLLER,
       leaderEpoch = leaderEpoch + 1
     )
 
@@ -1348,6 +1359,12 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
       leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
     )
 
+    assertAlterPartition(
+      partitionError = Errors.NOT_CONTROLLER,
+      partitionEpoch = partitionEpoch + 1,
+      leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
+    )
+
     assertAlterPartition(
       partitionError = Errors.FENCED_LEADER_EPOCH,
       leaderEpoch = leaderEpoch - 1,
@@ -1355,7 +1372,7 @@ class ControllerIntegrationTest extends QuorumTestHarness 
{
     )
 
     assertAlterPartition(
-      partitionError = Errors.FENCED_LEADER_EPOCH,
+      partitionError = Errors.NOT_CONTROLLER,
       leaderEpoch = leaderEpoch + 1,
       leaderRecoveryState = LeaderRecoveryState.RECOVERING.value
     )

Reply via email to