This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5990471b8ca KAFKA-14154; Kraft controller should return NOT_CONTROLLER
if request epoch is ahead (#12514)
5990471b8ca is described below
commit 5990471b8ca0dc275c9a8ff0cde2a6562ee8199e
Author: Jason Gustafson <[email protected]>
AuthorDate: Mon Aug 15 11:34:29 2022 -0700
KAFKA-14154; Kraft controller should return NOT_CONTROLLER if request epoch
is ahead (#12514)
Similar to https://github.com/apache/kafka/pull/12506. For the Kraft
controller, we should return NOT_CONTROLLER if the leader/partition epoch in
the request is ahead of the controller.
Reviewers: José Armando García Sancio <[email protected]>
---
.../kafka/controller/ReplicationControlManager.java | 21 +++++++++++++++++++--
.../controller/ReplicationControlManagerTest.java | 12 +++++++++++-
2 files changed, 30 insertions(+), 3 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 4ffb339967c..df7097df83d 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -125,6 +125,7 @@ import static
org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION;
import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED;
import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.NOT_CONTROLLER;
import static
org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
import static
org.apache.kafka.common.protocol.Errors.TOPIC_AUTHORIZATION_FAILED;
@@ -1071,7 +1072,23 @@ public class ReplicationControlManager {
return UNKNOWN_TOPIC_OR_PARTITION;
}
- if (partitionData.leaderEpoch() != partition.leaderEpoch) {
+
+ // If the partition leader has a higher leader/partition epoch, then
it is likely
+ // that this node is no longer the active controller. We return
NOT_CONTROLLER in
+ // this case to give the leader an opportunity to find the new
controller.
+ if (partitionData.leaderEpoch() > partition.leaderEpoch) {
+ log.debug("Rejecting AlterPartition request from node {} for {}-{}
because " +
+ "the current leader epoch is {}, which is greater than the
local value {}.",
+ brokerId, topic.name, partitionId, partition.leaderEpoch,
partitionData.leaderEpoch());
+ return NOT_CONTROLLER;
+ }
+ if (partitionData.partitionEpoch() > partition.partitionEpoch) {
+ log.debug("Rejecting AlterPartition request from node {} for {}-{}
because " +
+ "the current partition epoch is {}, which is greater than
the local value {}.",
+ brokerId, topic.name, partitionId, partition.partitionEpoch,
partitionData.partitionEpoch());
+ return NOT_CONTROLLER;
+ }
+ if (partitionData.leaderEpoch() < partition.leaderEpoch) {
log.debug("Rejecting AlterPartition request from node {} for {}-{}
because " +
"the current leader epoch is {}, not {}.", brokerId,
topic.name,
partitionId, partition.leaderEpoch,
partitionData.leaderEpoch());
@@ -1085,7 +1102,7 @@ public class ReplicationControlManager {
return INVALID_REQUEST;
}
- if (partitionData.partitionEpoch() != partition.partitionEpoch) {
+ if (partitionData.partitionEpoch() < partition.partitionEpoch) {
log.info("Rejecting AlterPartition request from node {} for {}-{}
because " +
"the current partition epoch is {}, not {}.", brokerId,
topic.name, partitionId, partition.partitionEpoch,
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index d33776ca10e..82e378a9823 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -122,6 +122,7 @@ import static
org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT
import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED;
import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.NOT_CONTROLLER;
import static
org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION;
@@ -960,7 +961,16 @@ public class ReplicationControlManagerTest {
ControllerResult<AlterPartitionResponseData> invalidLeaderEpochResult
= sendAlterPartition(
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
topicIdPartition.topicId(), invalidLeaderEpochRequest);
- assertAlterPartitionResponse(invalidLeaderEpochResult,
topicIdPartition, FENCED_LEADER_EPOCH);
+ assertAlterPartitionResponse(invalidLeaderEpochResult,
topicIdPartition, NOT_CONTROLLER);
+
+ // Invalid partition epoch
+ PartitionData invalidPartitionEpochRequest = newAlterPartition(
+ replicationControl, topicIdPartition, asList(0, 1),
LeaderRecoveryState.RECOVERED);
+ invalidPartitionEpochRequest.setPartitionEpoch(500);
+ ControllerResult<AlterPartitionResponseData>
invalidPartitionEpochResult = sendAlterPartition(
+ replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
+ topicIdPartition.topicId(), invalidPartitionEpochRequest);
+ assertAlterPartitionResponse(invalidPartitionEpochResult,
topicIdPartition, NOT_CONTROLLER);
// Invalid ISR (3 is not a valid replica)
PartitionData invalidIsrRequest1 = newAlterPartition(