This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8c88cdb7186 KAFKA-14617: Update AlterPartitionRequest and enable Kraft 
controller to reject stale request. (#13408)
8c88cdb7186 is described below

commit 8c88cdb7186b1d594f991eb324356dcfcabdf18a
Author: Calvin Liu <[email protected]>
AuthorDate: Fri Mar 31 02:27:42 2023 -0700

    KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to 
reject stale request. (#13408)
    
    Second part of the 
[KIP-903](https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR),
 it updates the AlterPartitionRequest:
    - Deprecate the NewIsr field
    - Create a new field BrokerState with BrokerId and BrokerEpoch
    - Bump the AlterPartition version to 3
    
    With this change, the Quorum Controller is enabled to reject stale 
AlterPartition request.
    
    Reviewers: Jun Rao <[email protected]>, David Jacot <[email protected]>
---
 .../common/requests/AlterPartitionRequest.java     |  21 +++
 .../common/message/AlterPartitionRequest.json      |  14 +-
 .../common/message/AlterPartitionResponse.json     |   4 +-
 .../common/requests/AlterPartitionRequestTest.java |  76 +++++++++++
 .../kafka/common/requests/RequestResponseTest.java |   2 +-
 .../scala/kafka/controller/KafkaController.scala   |   7 +-
 .../scala/kafka/server/AlterPartitionManager.scala |   5 +-
 .../controller/ControllerIntegrationTest.scala     |  19 +--
 .../kafka/server/AlterPartitionManagerTest.scala   |   6 +-
 .../kafka/controller/PartitionChangeBuilder.java   |   9 ++
 .../controller/ReplicationControlManager.java      |  44 ++++--
 .../controller/PartitionChangeBuilderTest.java     |  38 ++++--
 .../kafka/controller/QuorumControllerTest.java     |  13 +-
 .../controller/ReplicationControlManagerTest.java  | 149 ++++++++++++++++-----
 14 files changed, 326 insertions(+), 81 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
index 2d246f21041..2b150508292 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
@@ -18,12 +18,17 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
 import org.apache.kafka.common.message.AlterPartitionResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
 
 public class AlterPartitionRequest extends AbstractRequest {
 
@@ -77,6 +82,18 @@ public class AlterPartitionRequest extends AbstractRequest {
 
         @Override
         public AlterPartitionRequest build(short version) {
+            if (version < 3) {
+                data.topics().forEach(topicData -> {
+                    topicData.partitions().forEach(partitionData -> {
+                        List<Integer> newIsr = new 
ArrayList<>(partitionData.newIsrWithEpochs().size());
+                        partitionData.newIsrWithEpochs().forEach(brokerState 
-> {
+                            newIsr.add(brokerState.brokerId());
+                        });
+                        partitionData.setNewIsr(newIsr);
+                        
partitionData.setNewIsrWithEpochs(Collections.emptyList());
+                    });
+                });
+            }
             return new AlterPartitionRequest(data, version);
         }
 
@@ -85,4 +102,8 @@ public class AlterPartitionRequest extends AbstractRequest {
             return data.toString();
         }
     }
+
+    public static List<BrokerState> 
newIsrToSimpleNewIsrWithBrokerEpochs(List<Integer> newIsr) {
+        return newIsr.stream().map(brokerId -> new 
BrokerState().setBrokerId(brokerId)).collect(Collectors.toList());
+    }
 }
diff --git 
a/clients/src/main/resources/common/message/AlterPartitionRequest.json 
b/clients/src/main/resources/common/message/AlterPartitionRequest.json
index d91f317f97d..2e880cd64fa 100644
--- a/clients/src/main/resources/common/message/AlterPartitionRequest.json
+++ b/clients/src/main/resources/common/message/AlterPartitionRequest.json
@@ -21,7 +21,9 @@
   // Version 1 adds LeaderRecoveryState field (KIP-704).
   //
   // Version 2 adds TopicId field to replace TopicName field (KIP-841).
-  "validVersions": "0-2",
+  //
+  // Version 3 adds the NewIsrEpochs field and deprecates the NewIsr field 
(KIP-903).
+  "validVersions": "0-3",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
@@ -38,8 +40,14 @@
           "about": "The partition index" },
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
           "about": "The leader epoch of this partition" },
-        { "name": "NewIsr", "type": "[]int32", "versions": "0+", "entityType": 
"brokerId",
-          "about": "The ISR for this partition" },
+        { "name": "NewIsr", "type": "[]int32", "versions": "0-2", 
"entityType": "brokerId",
+          "about": "The ISR for this partition. Deprecated since version 3." },
+        { "name": "NewIsrWithEpochs", "type": "[]BrokerState", "versions": 
"3+", "fields": [
+          { "name": "BrokerId", "type": "int32", "versions": "3+", 
"entityType": "brokerId",
+            "about": "The ID of the broker." },
+          { "name": "BrokerEpoch", "type": "int64", "versions": "3+", 
"default": "-1",
+            "about": "The epoch of the broker. It will be -1 if the epoch 
check is not supported." }
+        ]},
         { "name": "LeaderRecoveryState", "type": "int8", "versions": "1+", 
"default": "0",
           "about": "1 if the partition is recovering from an unclean leader 
election; 0 otherwise." },
         { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
diff --git 
a/clients/src/main/resources/common/message/AlterPartitionResponse.json 
b/clients/src/main/resources/common/message/AlterPartitionResponse.json
index e8be99fd5e3..2c1eb3d46fb 100644
--- a/clients/src/main/resources/common/message/AlterPartitionResponse.json
+++ b/clients/src/main/resources/common/message/AlterPartitionResponse.json
@@ -21,7 +21,9 @@
   //
   // Version 2 adds TopicId field to replace TopicName field, can return the 
following new errors:
   // INELIGIBLE_REPLICA, NEW_LEADER_ELECTED and UNKNOWN_TOPIC_ID (KIP-841).
-  "validVersions": "0-2",
+  //
+  // Version 3 is the same as vesion 2 (KIP-903).
+  "validVersions": "0-3",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
new file mode 100644
index 00000000000..5b0231ca882
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
+import org.apache.kafka.common.message.AlterPartitionRequestData.PartitionData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.TopicData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class AlterPartitionRequestTest {
+    String topic = "test-topic";
+    Uuid topicId = Uuid.randomUuid();
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testBuildAlterPartitionRequest(short version) {
+        AlterPartitionRequestData request = new AlterPartitionRequestData()
+            .setBrokerId(1)
+            .setBrokerEpoch(1);
+
+        TopicData topicData = new TopicData()
+            .setTopicId(topicId)
+            .setTopicName(topic);
+
+        List<BrokerState> newIsrWithBrokerEpoch = new LinkedList<>();
+        newIsrWithBrokerEpoch.add(new 
BrokerState().setBrokerId(1).setBrokerEpoch(1001));
+        newIsrWithBrokerEpoch.add(new 
BrokerState().setBrokerId(2).setBrokerEpoch(1002));
+        newIsrWithBrokerEpoch.add(new 
BrokerState().setBrokerId(3).setBrokerEpoch(1003));
+
+        topicData.partitions().add(new PartitionData()
+            .setPartitionIndex(0)
+            .setLeaderEpoch(1)
+            .setPartitionEpoch(10)
+            .setNewIsrWithEpochs(newIsrWithBrokerEpoch));
+
+        request.topics().add(topicData);
+
+        AlterPartitionRequest.Builder builder = new 
AlterPartitionRequest.Builder(request, version > 1);
+        AlterPartitionRequest alterPartitionRequest = builder.build(version);
+        assertEquals(1, alterPartitionRequest.data().topics().size());
+        assertEquals(1, 
alterPartitionRequest.data().topics().get(0).partitions().size());
+        PartitionData partitionData = 
alterPartitionRequest.data().topics().get(0).partitions().get(0);
+        if (version < 3) {
+            assertEquals(Arrays.asList(1, 2, 3), partitionData.newIsr());
+            assertTrue(partitionData.newIsrWithEpochs().isEmpty());
+        } else {
+            assertEquals(newIsrWithBrokerEpoch, 
partitionData.newIsrWithEpochs());
+            assertTrue(partitionData.newIsr().isEmpty());
+        }
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7b0ca0d233e..cb84ff94841 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1366,7 +1366,7 @@ public class RequestResponseTest {
             .setPartitionIndex(1)
             .setPartitionEpoch(2)
             .setLeaderEpoch(3)
-            .setNewIsr(asList(1, 2));
+            
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(asList(1,
 2)));
 
         if (version >= 1) {
             // Use the none default value; 1 - RECOVERING
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 64ae5ad39c1..1588066e4d7 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -2324,12 +2324,17 @@ class KafkaController(val config: KafkaConfig,
 
         case Some(topicName) =>
           topicReq.partitions.forEach { partitionReq =>
+            val isr = if (alterPartitionRequestVersion >= 3) {
+              partitionReq.newIsrWithEpochs.asScala.toList.map(brokerState => 
brokerState.brokerId())
+            } else {
+              partitionReq.newIsr.asScala.toList.map(_.toInt)
+            }
             partitionsToAlter.put(
               new TopicPartition(topicName, partitionReq.partitionIndex),
               LeaderAndIsr(
                 alterPartitionRequest.brokerId,
                 partitionReq.leaderEpoch,
-                partitionReq.newIsr.asScala.toList.map(_.toInt),
+                isr,
                 LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
                 partitionReq.partitionEpoch
               )
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala 
b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index 8d874df5183..8e228152fe8 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.errors.OperationNotAttemptedException
 import org.apache.kafka.common.message.AlterPartitionRequestData
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.RequestHeader
@@ -280,10 +281,12 @@ class DefaultAlterPartitionManager(
       message.topics.add(topicData)
 
       items.foreach { item =>
+        val isrWithEpoch = new 
util.ArrayList[BrokerState](item.leaderAndIsr.isr.size)
+        item.leaderAndIsr.isr.foreach(brokerId => isrWithEpoch.add(new 
BrokerState().setBrokerId(brokerId)))
         val partitionData = new AlterPartitionRequestData.PartitionData()
           .setPartitionIndex(item.topicIdPartition.partition)
           .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
-          .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+          .setNewIsrWithEpochs(isrWithEpoch)
           .setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
 
         if (metadataVersion.isLeaderRecoverySupported) {
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index a195be37dbc..d5acc2f190a 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -30,6 +30,7 @@ import 
org.apache.kafka.common.message.{AlterPartitionRequestData, AlterPartitio
 import org.apache.kafka.common.metrics.KafkaMetric
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.AlterPartitionRequest
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.{ElectionType, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
@@ -969,7 +970,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     // on IBP >= 2.8 and 2) the AlterPartition version 2 and above is used.
     val canCallerUseTopicIds = metadataVersion.isTopicIdsSupported && 
alterPartitionVersion > 1
 
-    val alterPartitionRequest = new AlterPartitionRequestData()
+    val alterPartitionRequest = new AlterPartitionRequest.Builder(new 
AlterPartitionRequestData()
       .setBrokerId(brokerId)
       .setBrokerEpoch(brokerEpoch)
       .setTopics(Seq(new AlterPartitionRequestData.TopicData()
@@ -979,10 +980,10 @@ class ControllerIntegrationTest extends QuorumTestHarness 
{
           .setPartitionIndex(tp.partition)
           .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
           .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
-          .setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
+          
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(newLeaderAndIsr.isr.map(Int.box).asJava))
           .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
         ).asJava)
-      ).asJava)
+      ).asJava), alterPartitionVersion > 1).build(alterPartitionVersion).data()
 
     val future = alterPartitionFuture(alterPartitionRequest, 
alterPartitionVersion)
 
@@ -1038,7 +1039,7 @@ class ControllerIntegrationTest extends QuorumTestHarness 
{
           .setPartitionIndex(tp.partition)
           .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
           .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
-          .setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
+          
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(newLeaderAndIsr.isr.map(Int.box).asJava))
           .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
         ).asJava)
       ).asJava)
@@ -1089,7 +1090,7 @@ class ControllerIntegrationTest extends QuorumTestHarness 
{
             .setPartitionIndex(tp.partition)
             .setLeaderEpoch(oldLeaderAndIsr.leaderEpoch)
             .setPartitionEpoch(requestPartitionEpoch)
-            .setNewIsr(newIsr.map(Int.box).asJava)
+            
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(newIsr.map(Int.box).asJava))
             .setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value)
           ).asJava)
         ).asJava)
@@ -1151,15 +1152,15 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
         .setPartitionIndex(tp.partition)
         .setLeaderEpoch(leaderAndIsr.leaderEpoch)
         .setPartitionEpoch(leaderAndIsr.partitionEpoch)
-        .setNewIsr(fullIsr.map(Int.box).asJava)
+        
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(fullIsr.map(Int.box).asJava))
         
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)).asJava)
     if (alterPartitionVersion > 1) requestTopic.setTopicId(topicId) else 
requestTopic.setTopicName(tp.topic)
 
     // Try to update ISR to contain the offline broker.
-    val alterPartitionRequest = new AlterPartitionRequestData()
+    val alterPartitionRequest = new AlterPartitionRequest.Builder(new 
AlterPartitionRequestData()
       .setBrokerId(controllerId)
       .setBrokerEpoch(controllerEpoch)
-      .setTopics(Seq(requestTopic).asJava)
+      .setTopics(Seq(requestTopic).asJava), alterPartitionVersion > 
1).build(alterPartitionVersion).data()
 
     val future = alterPartitionFuture(alterPartitionRequest, 
alterPartitionVersion)
 
@@ -1484,7 +1485,7 @@ class ControllerIntegrationTest extends QuorumTestHarness 
{
           .setPartitionIndex(topicPartition.partition)
           .setLeaderEpoch(leaderEpoch)
           .setPartitionEpoch(partitionEpoch)
-          .setNewIsr(isr.toList.map(Int.box).asJava)
+          
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(isr.toList.map(Int.box).asJava))
           .setLeaderRecoveryState(leaderRecoveryState)).asJava)).asJava)
 
     val future = alterPartitionFuture(alterPartitionRequest, if 
(topicIdOpt.isDefined) AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION else 
1)
diff --git 
a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
index 1ceabb4ad21..0560b2c776f 100644
--- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
@@ -139,7 +139,11 @@ class AlterPartitionManagerTest {
     // Make sure we sent the right request ISR={1}
     val request = capture.getValue.build()
     assertEquals(request.data().topics().size(), 1)
-    
assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(),
 1)
+    if (request.version() < 3) {
+      assertEquals(request.data.topics.get(0).partitions.get(0).newIsr.size, 1)
+    } else {
+      
assertEquals(request.data.topics.get(0).partitions.get(0).newIsrWithEpochs.size,
 1)
+    }
   }
 
   @ParameterizedTest
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index 2af7f7d277d..9c630dfc697 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -21,7 +21,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.IntPredicate;
+import java.util.stream.Collectors;
+
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.PartitionRegistration;
@@ -100,6 +103,12 @@ public class PartitionChangeBuilder {
         return this;
     }
 
+    public PartitionChangeBuilder 
setTargetIsrWithBrokerStates(List<BrokerState> targetIsrWithEpoch) {
+        this.targetIsr = targetIsrWithEpoch.stream()
+            .map(brokerState -> 
brokerState.brokerId()).collect(Collectors.toList());
+        return this;
+    }
+
     public PartitionChangeBuilder setTargetReplicas(List<Integer> 
targetReplicas) {
         this.targetReplicas = targetReplicas;
         return this;
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 8b1b2fe0bba..d627c7b5cd6 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.errors.UnknownTopicIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
 import org.apache.kafka.common.message.AlterPartitionResponseData;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
 import 
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
@@ -74,6 +75,7 @@ import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
@@ -956,6 +958,12 @@ public class ReplicationControlManager {
 
             TopicControlInfo topic = topics.get(topicId);
             for (AlterPartitionRequestData.PartitionData partitionData : 
topicData.partitions()) {
+                if (requestVersion < 3) {
+                    partitionData.setNewIsrWithEpochs(
+                        
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(partitionData.newIsr())
+                    );
+                }
+
                 int partitionId = partitionData.partitionIndex();
                 PartitionRegistration partition = topic.parts.get(partitionId);
 
@@ -986,7 +994,7 @@ public class ReplicationControlManager {
                 if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
                     
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
                 }
-                builder.setTargetIsr(partitionData.newIsr());
+                
builder.setTargetIsrWithBrokerStates(partitionData.newIsrWithEpochs());
                 builder.setTargetLeaderRecoveryState(
                     
LeaderRecoveryState.of(partitionData.leaderRecoveryState()));
                 Optional<ApiMessageAndVersion> record = builder.build();
@@ -1110,11 +1118,14 @@ public class ReplicationControlManager {
 
             return INVALID_UPDATE_VERSION;
         }
-        int[] newIsr = Replicas.toArray(partitionData.newIsr());
+
+        int[] newIsr = partitionData.newIsrWithEpochs().stream()
+            .mapToInt(brokerState -> brokerState.brokerId()).toArray();
+
         if (!Replicas.validateIsr(partition.replicas, newIsr)) {
             log.error("Rejecting AlterPartition request from node {} for {}-{} 
because " +
                     "it specified an invalid ISR {}.", brokerId,
-                    topic.name, partitionId, partitionData.newIsr());
+                    topic.name, partitionId, partitionData.newIsrWithEpochs());
 
             return INVALID_REQUEST;
         }
@@ -1122,7 +1133,7 @@ public class ReplicationControlManager {
             // The ISR must always include the current leader.
             log.error("Rejecting AlterPartition request from node {} for {}-{} 
because " +
                     "it specified an invalid ISR {} that doesn't include 
itself.",
-                    brokerId, topic.name, partitionId, partitionData.newIsr());
+                    brokerId, topic.name, partitionId, 
partitionData.newIsrWithEpochs());
 
             return INVALID_REQUEST;
         }
@@ -1131,7 +1142,7 @@ public class ReplicationControlManager {
             log.info("Rejecting AlterPartition request from node {} for {}-{} 
because " +
                     "the ISR {} had more than one replica while the leader was 
still " +
                     "recovering from an unclean leader election {}.",
-                    brokerId, topic.name, partitionId, partitionData.newIsr(),
+                    brokerId, topic.name, partitionId, 
partitionData.newIsrWithEpochs(),
                     leaderRecoveryState);
 
             return INVALID_REQUEST;
@@ -1145,11 +1156,11 @@ public class ReplicationControlManager {
             return INVALID_REQUEST;
         }
 
-        List<IneligibleReplica> ineligibleReplicas = 
ineligibleReplicasForIsr(newIsr);
+        List<IneligibleReplica> ineligibleReplicas = 
ineligibleReplicasForIsr(partitionData.newIsrWithEpochs());
         if (!ineligibleReplicas.isEmpty()) {
             log.info("Rejecting AlterPartition request from node {} for {}-{} 
because " +
                     "it specified ineligible replicas {} in the new ISR {}.",
-                    brokerId, topic.name, partitionId, ineligibleReplicas, 
partitionData.newIsr());
+                    brokerId, topic.name, partitionId, ineligibleReplicas, 
partitionData.newIsrWithEpochs());
 
             if (requestApiVersion > 1) {
                 return INELIGIBLE_REPLICA;
@@ -1161,16 +1172,23 @@ public class ReplicationControlManager {
         return Errors.NONE;
     }
 
-    private List<IneligibleReplica> ineligibleReplicasForIsr(int[] replicas) {
+    private List<IneligibleReplica> ineligibleReplicasForIsr(List<BrokerState> 
brokerStates) {
         List<IneligibleReplica> ineligibleReplicas = new ArrayList<>(0);
-        for (Integer replicaId : replicas) {
-            BrokerRegistration registration = 
clusterControl.registration(replicaId);
+        for (BrokerState brokerState : brokerStates) {
+            int brokerId = brokerState.brokerId();
+            BrokerRegistration registration = 
clusterControl.registration(brokerId);
             if (registration == null) {
-                ineligibleReplicas.add(new IneligibleReplica(replicaId, "not 
registered"));
+                ineligibleReplicas.add(new IneligibleReplica(brokerId, "not 
registered"));
             } else if (registration.inControlledShutdown()) {
-                ineligibleReplicas.add(new IneligibleReplica(replicaId, 
"shutting down"));
+                ineligibleReplicas.add(new IneligibleReplica(brokerId, 
"shutting down"));
             } else if (registration.fenced()) {
-                ineligibleReplicas.add(new IneligibleReplica(replicaId, 
"fenced"));
+                ineligibleReplicas.add(new IneligibleReplica(brokerId, 
"fenced"));
+            } else if (brokerState.brokerEpoch() != -1 && registration.epoch() 
!= brokerState.brokerEpoch()) {
+                // The given broker epoch should match with the broker epoch 
in the broker registration, except the
+                // given broker epoch is -1 which means skipping the broker 
epoch verification.
+                ineligibleReplicas.add(new IneligibleReplica(brokerId,
+                    "broker epoch mismatch: requested=" + 
brokerState.brokerEpoch()
+                        + " VS expected=" + registration.epoch()));
             }
         }
         return ineligibleReplicas;
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index e3063a74fc7..bc505979274 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.controller;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.protocol.types.TaggedFields;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
 import org.apache.kafka.controller.PartitionChangeBuilder.ElectionResult;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.PartitionRegistration;
@@ -128,12 +129,17 @@ public class PartitionChangeBuilderTest {
         
assertElectLeaderEquals(createFooBuilder().setElection(Election.PREFERRED), 2, 
false);
         assertElectLeaderEquals(createFooBuilder(), 1, false);
         
assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN), 1, 
false);
-        
assertElectLeaderEquals(createFooBuilder().setTargetIsr(Arrays.asList(1, 3)), 
1, false);
-        
assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN).setTargetIsr(Arrays.asList(1,
 3)), 1, false);
-        
assertElectLeaderEquals(createFooBuilder().setTargetIsr(Arrays.asList(3)), 
NO_LEADER, false);
-        
assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN).setTargetIsr(Arrays.asList(3)),
 2, true);
+        assertElectLeaderEquals(createFooBuilder()
+            
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1,
 3))), 1, false);
+        
assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN)
+            
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1,
 3))), 1, false);
+        assertElectLeaderEquals(createFooBuilder()
+            
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))),
 NO_LEADER, false);
+        
assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN).
+            
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))),
 2, true);
         assertElectLeaderEquals(
-            
createFooBuilder().setElection(Election.UNCLEAN).setTargetIsr(Arrays.asList(4)).setTargetReplicas(Arrays.asList(2,
 1, 3, 4)),
+            createFooBuilder().setElection(Election.UNCLEAN)
+                
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(4))).setTargetReplicas(Arrays.asList(2,
 1, 3, 4)),
             4,
             false
         );
@@ -155,9 +161,11 @@ public class PartitionChangeBuilderTest {
         testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(),
             new PartitionChangeRecord(), NO_LEADER_CHANGE);
         testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
-            setTargetIsr(Arrays.asList(2, 1)), new PartitionChangeRecord(), 1);
+            
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2,
 1))),
+            new PartitionChangeRecord(), 1);
         testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
-            setTargetIsr(Arrays.asList(2, 1, 3, 4)), new 
PartitionChangeRecord(),
+            
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2,
 1, 3, 4))),
+            new PartitionChangeRecord(),
             NO_LEADER_CHANGE);
         testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
             setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new 
PartitionChangeRecord(),
@@ -183,7 +191,7 @@ public class PartitionChangeBuilderTest {
             setPartitionId(0).
             setIsr(Arrays.asList(2, 1)).
             setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())),
-            createFooBuilder().setTargetIsr(Arrays.asList(2, 1)).build());
+            
createFooBuilder().setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2,
 1))).build());
     }
 
     @Test
@@ -193,7 +201,7 @@ public class PartitionChangeBuilderTest {
                 setPartitionId(0).
                 setIsr(Arrays.asList(2, 3)).
                 setLeader(2), 
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
-            createFooBuilder().setTargetIsr(Arrays.asList(2, 3)).build());
+            
createFooBuilder().setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2,
 3))).build());
     }
 
     @Test
@@ -217,7 +225,7 @@ public class PartitionChangeBuilderTest {
                 setRemovingReplicas(Collections.emptyList()).
                 setAddingReplicas(Collections.emptyList()),
                 PARTITION_CHANGE_RECORD.highestSupportedVersion())),
-            createBarBuilder().setTargetIsr(Arrays.asList(1, 2, 3, 
4)).build());
+            
createBarBuilder().setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1,
 2, 3, 4))).build());
     }
 
     @Test
@@ -235,7 +243,7 @@ public class PartitionChangeBuilderTest {
                 PARTITION_CHANGE_RECORD.highestSupportedVersion())),
             createBarBuilder().
                 setTargetReplicas(revert.replicas()).
-                setTargetIsr(revert.isr()).
+                
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(revert.isr())).
                 setTargetRemoving(Collections.emptyList()).
                 setTargetAdding(Collections.emptyList()).
                 build());
@@ -293,7 +301,8 @@ public class PartitionChangeBuilderTest {
         );
         assertEquals(
             Optional.of(expectedRecord),
-            
createFooBuilder().setElection(Election.UNCLEAN).setTargetIsr(Arrays.asList(3)).build()
+            createFooBuilder().setElection(Election.UNCLEAN)
+                
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))).build()
         );
 
         expectedRecord = new ApiMessageAndVersion(
@@ -311,7 +320,8 @@ public class PartitionChangeBuilderTest {
         );
         assertEquals(
             Optional.of(expectedRecord),
-            
createOfflineBuilder().setElection(Election.UNCLEAN).setTargetIsr(Arrays.asList(2)).build()
+            createOfflineBuilder().setElection(Election.UNCLEAN)
+                
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2))).build()
         );
     }
 
@@ -341,7 +351,7 @@ public class PartitionChangeBuilderTest {
             isLeaderRecoverySupported
         );
         // Set the target ISR to empty to indicate that the last leader is 
offline
-        offlineBuilder.setTargetIsr(Collections.emptyList());
+        offlineBuilder.setTargetIsrWithBrokerStates(Collections.emptyList());
 
         // The partition should stay as recovering
         PartitionChangeRecord changeRecord = (PartitionChangeRecord) 
offlineBuilder
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 4646c0d217d..6fffcddf0a0 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -49,6 +49,7 @@ import org.apache.kafka.common.message.RequestHeaderData;
 import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -343,7 +344,6 @@ public class QuorumControllerTest {
         }
     }
 
-    @Test
     public void testBalancePartitionLeaders() throws Throwable {
         List<Integer> allBrokers = Arrays.asList(1, 2, 3);
         List<Integer> brokersToKeepUnfenced = Arrays.asList(1, 2);
@@ -455,7 +455,7 @@ public class QuorumControllerTest {
                 .setPartitionIndex(imbalancedPartitionId)
                 .setLeaderEpoch(partitionRegistration.leaderEpoch)
                 .setPartitionEpoch(partitionRegistration.partitionEpoch)
-                .setNewIsr(Arrays.asList(1, 2, 3));
+                
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1,
 2, 3)));
 
             AlterPartitionRequestData.TopicData topicData = new 
AlterPartitionRequestData.TopicData()
                 .setTopicName("foo");
@@ -466,7 +466,8 @@ public class QuorumControllerTest {
                 
.setBrokerEpoch(brokerEpochs.get(partitionRegistration.leader));
             alterPartitionRequest.topics().add(topicData);
 
-            active.alterPartition(ANONYMOUS_CONTEXT, 
alterPartitionRequest).get();
+            active.alterPartition(ANONYMOUS_CONTEXT, new AlterPartitionRequest
+                .Builder(alterPartitionRequest, false).build((short) 
0).data()).get();
 
             // Check that partitions are balanced
             AtomicLong lastHeartbeat = new 
AtomicLong(active.time().milliseconds());
@@ -861,7 +862,6 @@ public class QuorumControllerTest {
     }
 
     @Disabled // TODO: need to fix leader election in LocalLog.
-    @Test
     public void testMissingInMemorySnapshot() throws Exception {
         int numBrokers = 3;
         int numPartitions = 3;
@@ -911,7 +911,7 @@ public class QuorumControllerTest {
                         .setPartitionIndex(partitionIndex)
                         .setLeaderEpoch(partitionRegistration.leaderEpoch)
                         
.setPartitionEpoch(partitionRegistration.partitionEpoch)
-                        .setNewIsr(Arrays.asList(0, 1));
+                        
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(0,
 1)));
                 })
                 .collect(Collectors.toList());
 
@@ -929,7 +929,8 @@ public class QuorumControllerTest {
 
             int oldClaimEpoch = controller.curClaimEpoch();
             assertThrows(ExecutionException.class,
-                () -> controller.alterPartition(ANONYMOUS_CONTEXT, 
alterPartitionRequest).get());
+                () -> controller.alterPartition(ANONYMOUS_CONTEXT, new 
AlterPartitionRequest
+                    .Builder(alterPartitionRequest, false).build((short) 
0).data()).get());
 
             // Wait for the controller to become active again
             assertSame(controller, controlEnv.activeController());
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 6aa4fe508db..b8b42609959 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -26,6 +26,7 @@ import 
org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
 import org.apache.kafka.common.errors.PolicyViolationException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
 import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
 import org.apache.kafka.common.message.AlterPartitionRequestData.PartitionData;
 import org.apache.kafka.common.message.AlterPartitionRequestData.TopicData;
 import org.apache.kafka.common.message.AlterPartitionResponseData;
@@ -64,6 +65,7 @@ import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
@@ -306,7 +308,7 @@ public class ReplicationControlManagerTest {
         void registerBrokers(Integer... brokerIds) throws Exception {
             for (int brokerId : brokerIds) {
                 RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
-                    setBrokerEpoch(brokerId + 
100).setBrokerId(brokerId).setRack(null);
+                    
setBrokerEpoch(defaultBrokerEpoch(brokerId)).setBrokerId(brokerId).setRack(null);
                 brokerRecord.endPoints().add(new 
RegisterBrokerRecord.BrokerEndpoint().
                     setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
                     setPort((short) 9092 + brokerId).
@@ -319,7 +321,7 @@ public class ReplicationControlManagerTest {
         void alterPartition(
             TopicIdPartition topicIdPartition,
             int leaderId,
-            List<Integer> isr,
+            List<BrokerState> isrWithEpoch,
             LeaderRecoveryState leaderRecoveryState
         ) throws Exception {
             BrokerRegistration registration = 
clusterControl.brokerRegistrations().get(leaderId);
@@ -337,7 +339,7 @@ public class ReplicationControlManagerTest {
                 .setPartitionEpoch(partition.partitionEpoch)
                 .setLeaderEpoch(partition.leaderEpoch)
                 .setLeaderRecoveryState(leaderRecoveryState.value())
-                .setNewIsr(isr);
+                .setNewIsrWithEpochs(isrWithEpoch);
 
             String topicName = 
replicationControl.getTopic(topicIdPartition.topicId()).name();
             TopicData topicData = new TopicData()
@@ -364,7 +366,7 @@ public class ReplicationControlManagerTest {
             for (int brokerId : brokerIds) {
                 ControllerResult<BrokerHeartbeatReply> result = 
replicationControl.
                     processBrokerHeartbeat(new BrokerHeartbeatRequestData().
-                        setBrokerId(brokerId).setBrokerEpoch(brokerId + 100).
+                        
setBrokerId(brokerId).setBrokerEpoch(defaultBrokerEpoch(brokerId)).
                         setCurrentMetadataOffset(1).
                         setWantFence(false).setWantShutDown(false), 0);
                 assertEquals(new BrokerHeartbeatReply(true, false, false, 
false),
@@ -381,7 +383,7 @@ public class ReplicationControlManagerTest {
             for (int brokerId : brokerIds) {
                 BrokerRegistrationChangeRecord record = new 
BrokerRegistrationChangeRecord()
                     .setBrokerId(brokerId)
-                    .setBrokerEpoch(brokerId + 100)
+                    .setBrokerEpoch(defaultBrokerEpoch(brokerId))
                     
.setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
                 replay(singletonList(new ApiMessageAndVersion(record, (short) 
1)));
             }
@@ -842,7 +844,7 @@ public class ReplicationControlManagerTest {
         assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
         long brokerEpoch = ctx.currentBrokerEpoch(0);
         PartitionData shrinkIsrRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1), 
LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), 
LeaderRecoveryState.RECOVERED);
         ControllerResult<AlterPartitionResponseData> shrinkIsrResult = 
sendAlterPartition(
             replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), 
shrinkIsrRequest);
         AlterPartitionResponseData.PartitionData shrinkIsrResponse = 
assertAlterPartitionResponse(
@@ -850,7 +852,7 @@ public class ReplicationControlManagerTest {
         assertConsistentAlterPartitionResponse(replicationControl, 
topicIdPartition, shrinkIsrResponse);
 
         PartitionData expandIsrRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1, 2), 
LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1, 
2), LeaderRecoveryState.RECOVERED);
         ControllerResult<AlterPartitionResponseData> expandIsrResult = 
sendAlterPartition(
             replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), 
expandIsrRequest);
         AlterPartitionResponseData.PartitionData expandIsrResponse = 
assertAlterPartitionResponse(
@@ -913,7 +915,7 @@ public class ReplicationControlManagerTest {
 
         // Invalid leader
         PartitionData invalidLeaderRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1), 
LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), 
LeaderRecoveryState.RECOVERED);
         ControllerResult<AlterPartitionResponseData> invalidLeaderResult = 
sendAlterPartition(
             replicationControl, notLeaderId, 
ctx.currentBrokerEpoch(notLeaderId),
             topicIdPartition.topicId(), invalidLeaderRequest);
@@ -921,13 +923,13 @@ public class ReplicationControlManagerTest {
 
         // Stale broker epoch
         PartitionData invalidBrokerEpochRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1), 
LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), 
LeaderRecoveryState.RECOVERED);
         assertThrows(StaleBrokerEpochException.class, () -> sendAlterPartition(
             replicationControl, leaderId, brokerEpoch - 1, 
topicIdPartition.topicId(), invalidBrokerEpochRequest));
 
         // Invalid leader epoch
         PartitionData invalidLeaderEpochRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1), 
LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), 
LeaderRecoveryState.RECOVERED);
         invalidLeaderEpochRequest.setLeaderEpoch(500);
         ControllerResult<AlterPartitionResponseData> invalidLeaderEpochResult 
= sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
@@ -936,7 +938,7 @@ public class ReplicationControlManagerTest {
 
         // Invalid partition epoch
         PartitionData invalidPartitionEpochRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1), 
LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), 
LeaderRecoveryState.RECOVERED);
         invalidPartitionEpochRequest.setPartitionEpoch(500);
         ControllerResult<AlterPartitionResponseData> 
invalidPartitionEpochResult = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
@@ -945,7 +947,7 @@ public class ReplicationControlManagerTest {
 
         // Invalid ISR (3 is not a valid replica)
         PartitionData invalidIsrRequest1 = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1, 3), 
LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1, 
3), LeaderRecoveryState.RECOVERED);
         ControllerResult<AlterPartitionResponseData> invalidIsrResult1 = 
sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidIsrRequest1);
@@ -953,7 +955,7 @@ public class ReplicationControlManagerTest {
 
         // Invalid ISR (does not include leader 0)
         PartitionData invalidIsrRequest2 = newAlterPartition(
-            replicationControl, topicIdPartition, asList(1, 2), 
LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, isrWithDefaultEpoch(1, 2), 
LeaderRecoveryState.RECOVERED);
         ControllerResult<AlterPartitionResponseData> invalidIsrResult2 = 
sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidIsrRequest2);
@@ -961,7 +963,7 @@ public class ReplicationControlManagerTest {
 
         // Invalid ISR length and recovery state
         PartitionData invalidIsrRecoveryRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1), 
LeaderRecoveryState.RECOVERING);
+            replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), 
LeaderRecoveryState.RECOVERING);
         ControllerResult<AlterPartitionResponseData> invalidIsrRecoveryResult 
= sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidIsrRecoveryRequest);
@@ -969,7 +971,7 @@ public class ReplicationControlManagerTest {
 
         // Invalid recovery state transition from RECOVERED to RECOVERING
         PartitionData invalidRecoveryRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0), 
LeaderRecoveryState.RECOVERING);
+            replicationControl, topicIdPartition, isrWithDefaultEpoch(0), 
LeaderRecoveryState.RECOVERING);
         ControllerResult<AlterPartitionResponseData> invalidRecoveryResult = 
sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidRecoveryRequest);
@@ -979,7 +981,7 @@ public class ReplicationControlManagerTest {
     private PartitionData newAlterPartition(
         ReplicationControlManager replicationControl,
         TopicIdPartition topicIdPartition,
-        List<Integer> newIsr,
+        List<BrokerState> newIsrWithEpoch,
         LeaderRecoveryState leaderRecoveryState
     ) {
         PartitionRegistration partitionControl =
@@ -988,7 +990,7 @@ public class ReplicationControlManagerTest {
             .setPartitionIndex(0)
             .setLeaderEpoch(partitionControl.leaderEpoch)
             .setPartitionEpoch(partitionControl.partitionEpoch)
-            .setNewIsr(newIsr)
+            .setNewIsrWithEpochs(newIsrWithEpoch)
             .setLeaderRecoveryState(leaderRecoveryState.value());
     }
 
@@ -1501,9 +1503,9 @@ public class ReplicationControlManagerTest {
         log.info("running final alterPartition...");
         ControllerRequestContext requestContext =
             anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
-        ControllerResult<AlterPartitionResponseData> alterPartitionResult = 
replication.alterPartition(
-            requestContext,
-            new AlterPartitionRequestData().setBrokerId(3).setBrokerEpoch(103).
+        AlterPartitionRequestData alterPartitionRequestData = new 
AlterPartitionRequestData().
+                setBrokerId(3).
+                setBrokerEpoch(103).
                 setTopics(asList(new TopicData().
                     setTopicName(version <= 1 ? "foo" : "").
                     setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
@@ -1511,7 +1513,10 @@ public class ReplicationControlManagerTest {
                         setPartitionIndex(1).
                         setPartitionEpoch(1).
                         setLeaderEpoch(0).
-                        setNewIsr(asList(3, 0, 2, 1)))))));
+                        setNewIsrWithEpochs(isrWithDefaultEpoch(3, 0, 2, 
1))))));
+        ControllerResult<AlterPartitionResponseData> alterPartitionResult = 
replication.alterPartition(
+            requestContext,
+            new AlterPartitionRequest.Builder(alterPartitionRequestData, 
version > 1).build(version).data());
         Errors expectedError = version > 1 ? NEW_LEADER_ELECTED : 
FENCED_LEADER_EPOCH;
         assertEquals(new AlterPartitionResponseData().setTopics(asList(
             new AlterPartitionResponseData.TopicData().
@@ -1564,13 +1569,13 @@ public class ReplicationControlManagerTest {
                     .setPartitionIndex(0)
                     .setPartitionEpoch(1)
                     .setLeaderEpoch(1)
-                    .setNewIsr(asList(1, 2, 3, 4))))));
+                    .setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3, 4))))));
 
         ControllerRequestContext requestContext =
             anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
 
         ControllerResult<AlterPartitionResponseData> alterPartitionResult =
-            replication.alterPartition(requestContext, alterIsrRequest);
+            replication.alterPartition(requestContext, new 
AlterPartitionRequest.Builder(alterIsrRequest, version > 
1).build(version).data());
 
         Errors expectedError = version <= 1 ? OPERATION_NOT_ATTEMPTED : 
INELIGIBLE_REPLICA;
         assertEquals(
@@ -1604,6 +1609,75 @@ public class ReplicationControlManagerTest {
             alterPartitionResult.response());
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testAlterPartitionShouldRejectBrokersWithStaleEpoch(short 
version) throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+        Uuid fooId = ctx.createTestTopic(
+            "foo",
+            new int[][] {new int[] {1, 2, 3, 4}}
+        ).topicId();
+        ctx.alterPartition(new TopicIdPartition(fooId, 0), 1, 
isrWithDefaultEpoch(1, 2, 3), LeaderRecoveryState.RECOVERED);
+
+        // First, the leader is constructing an AlterPartition request.
+        AlterPartitionRequestData alterIsrRequest = new 
AlterPartitionRequestData().
+            setBrokerId(1).
+            setBrokerEpoch(101).
+            setTopics(asList(new TopicData().
+                setTopicName(version <= 1 ? "foo" : "").
+                setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
+                setPartitions(asList(new PartitionData().
+                    setPartitionIndex(0).
+                    setPartitionEpoch(1).
+                    setLeaderEpoch(1).
+                    setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3, 4))))));
+
+        // The broker 4 has failed silently and now registers again.
+        long newEpoch = defaultBrokerEpoch(4) + 1000;
+        RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
+            setBrokerEpoch(newEpoch).setBrokerId(4).setRack(null);
+        brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
+            setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
+            setPort((short) 9092 + 4).
+            setName("PLAINTEXT").
+            setHost("localhost"));
+        ctx.replay(Collections.singletonList(new 
ApiMessageAndVersion(brokerRecord, (short) 0)));
+
+        // Unfence the broker 4.
+        ControllerResult<BrokerHeartbeatReply> result = ctx.replicationControl.
+            processBrokerHeartbeat(new BrokerHeartbeatRequestData().
+                setBrokerId(4).setBrokerEpoch(newEpoch).
+                setCurrentMetadataOffset(1).
+                setWantFence(false).setWantShutDown(false), 0);
+        assertEquals(new BrokerHeartbeatReply(true, false, false, false),
+            result.response());
+        ctx.replay(result.records());
+
+        ControllerRequestContext requestContext =
+            anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
+
+        ControllerResult<AlterPartitionResponseData> alterPartitionResult =
+            replication.alterPartition(requestContext, new 
AlterPartitionRequest.Builder(alterIsrRequest, version > 
1).build(version).data());
+
+        // The late arrived AlterPartition request should be rejected when 
version >= 3.
+        if (version >= 3) {
+            assertEquals(
+                new AlterPartitionResponseData().
+                    setTopics(asList(new 
AlterPartitionResponseData.TopicData().
+                        setTopicName(version <= 1 ? "foo" : "").
+                        setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
+                        setPartitions(asList(new 
AlterPartitionResponseData.PartitionData().
+                            setPartitionIndex(0).
+                            setErrorCode(INELIGIBLE_REPLICA.code()))))),
+                alterPartitionResult.response());
+        } else {
+            assertEquals(NONE.code(), 
alterPartitionResult.response().errorCode());
+        }
+    }
+
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
     public void testAlterPartitionShouldRejectShuttingDownBrokers(short 
version) throws Exception {
@@ -1640,13 +1714,13 @@ public class ReplicationControlManagerTest {
                     .setPartitionIndex(0)
                     .setPartitionEpoch(0)
                     .setLeaderEpoch(0)
-                    .setNewIsr(asList(1, 2, 3, 4))))));
+                    .setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3, 4))))));
 
         ControllerRequestContext requestContext =
             anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
 
         ControllerResult<AlterPartitionResponseData> alterPartitionResult =
-            replication.alterPartition(requestContext, alterIsrRequest);
+            replication.alterPartition(requestContext, new 
AlterPartitionRequest.Builder(alterIsrRequest, version > 
1).build(version).data());
 
         Errors expectedError = version <= 1 ? OPERATION_NOT_ATTEMPTED : 
INELIGIBLE_REPLICA;
         assertEquals(
@@ -1738,7 +1812,7 @@ public class ReplicationControlManagerTest {
             new AlterPartitionRequestData().setBrokerId(4).setBrokerEpoch(104).
                 setTopics(asList(new 
TopicData().setTopicId(barId).setPartitions(asList(
                     new 
PartitionData().setPartitionIndex(0).setPartitionEpoch(2).
-                        setLeaderEpoch(1).setNewIsr(asList(4, 1, 2, 0)))))));
+                        
setLeaderEpoch(1).setNewIsrWithEpochs(isrWithDefaultEpoch(4, 1, 2, 0)))))));
         assertEquals(new AlterPartitionResponseData().setTopics(asList(
             new 
AlterPartitionResponseData.TopicData().setTopicId(barId).setPartitions(asList(
                 new AlterPartitionResponseData.PartitionData().
@@ -1867,7 +1941,7 @@ public class ReplicationControlManagerTest {
 
         // Bring 2 back into the ISR for partition 1. This allows us to verify 
that
         // preferred election does not occur as a result of the unclean 
election request.
-        ctx.alterPartition(partition1, 4, asList(2, 4), 
LeaderRecoveryState.RECOVERED);
+        ctx.alterPartition(partition1, 4, isrWithDefaultEpoch(2, 4), 
LeaderRecoveryState.RECOVERED);
 
         ControllerResult<ElectLeadersResponseData> result = 
replication.electLeaders(request);
         assertEquals(1, result.records().size());
@@ -2039,10 +2113,10 @@ public class ReplicationControlManagerTest {
                     setPartitions(asList(
                         new AlterPartitionRequestData.PartitionData().
                             setPartitionIndex(0).setPartitionEpoch(0).
-                            setLeaderEpoch(0).setNewIsr(asList(1, 2, 3)),
+                            
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3)),
                         new AlterPartitionRequestData.PartitionData().
                             setPartitionIndex(2).setPartitionEpoch(0).
-                            setLeaderEpoch(0).setNewIsr(asList(0, 2, 1)))))));
+                            
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(0, 2, 1)))))));
         assertEquals(new AlterPartitionResponseData().setTopics(asList(
             new 
AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(asList(
                 new AlterPartitionResponseData.PartitionData().
@@ -2124,7 +2198,7 @@ public class ReplicationControlManagerTest {
                 setTopics(asList(new 
AlterPartitionRequestData.TopicData().setTopicId(fooId).
                     setPartitions(asList(new 
AlterPartitionRequestData.PartitionData().
                         setPartitionIndex(0).setPartitionEpoch(0).
-                        setLeaderEpoch(0).setNewIsr(asList(1, 2, 3)))))));
+                        
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3)))))));
         assertEquals(new AlterPartitionResponseData().setTopics(asList(
             new 
AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(asList(
                 new AlterPartitionResponseData.PartitionData().
@@ -2156,7 +2230,7 @@ public class ReplicationControlManagerTest {
                 setTopics(asList(new 
AlterPartitionRequestData.TopicData().setTopicId(fooId).
                     setPartitions(asList(new 
AlterPartitionRequestData.PartitionData().
                         setPartitionIndex(2).setPartitionEpoch(0).
-                        setLeaderEpoch(0).setNewIsr(asList(0, 2, 1)))))));
+                        
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(0, 2, 1)))))));
         assertEquals(new AlterPartitionResponseData().setTopics(asList(
             new 
AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(asList(
                 new AlterPartitionResponseData.PartitionData().
@@ -2292,4 +2366,17 @@ public class ReplicationControlManagerTest {
 
         assertEquals(expectedRecords, result.records());
     }
+
+    private static BrokerState brokerState(int brokerId, Long brokerEpoch) {
+        return new 
BrokerState().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch);
+    }
+
+    private static Long defaultBrokerEpoch(int brokerId) {
+        return brokerId + 100L;
+    }
+
+    private static List<BrokerState> isrWithDefaultEpoch(Integer... isr) {
+        return Arrays.stream(isr).map(brokerId -> brokerState(brokerId, 
defaultBrokerEpoch(brokerId)))
+            .collect(Collectors.toList());
+    }
 }

Reply via email to