splett2 commented on code in PR #12489:
URL: https://github.com/apache/kafka/pull/12489#discussion_r940807029


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1002,47 +1002,59 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
 
     val controller = getController().kafkaController
     val leaderIsrAndControllerEpochMap = 
zkClient.getTopicPartitionStates(Seq(tp))
-    val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val oldLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val newIsr = List(oldLeaderAndIsr.leader)
+    val newPartitionEpoch = oldLeaderAndIsr.partitionEpoch + 1
     val topicId = controller.controllerContext.topicIds(tp.topic)
     val brokerId = otherBroker.config.brokerId
     val brokerEpoch = 
controller.controllerContext.liveBrokerIdAndEpochs(otherBroker.config.brokerId)
 
-    // When re-sending the current ISR, we should not get and error or any ISR 
changes
-    val alterPartitionRequest = new AlterPartitionRequestData()
-      .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpoch)
-      .setTopics(Seq(new AlterPartitionRequestData.TopicData()
-        .setTopicId(topicId)
-        .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
-          .setPartitionIndex(tp.partition)
-          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
-          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
-          .setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
-          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+    def sendAndVerifyAlterPartitionResponse(requestPartitionEpoch: Int): Unit 
= {
+      val alterPartitionRequest = new AlterPartitionRequestData()
+        .setBrokerId(brokerId)
+        .setBrokerEpoch(brokerEpoch)
+        .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+          .setTopicId(topicId)
+          .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+            .setPartitionIndex(tp.partition)
+            .setLeaderEpoch(oldLeaderAndIsr.leaderEpoch)
+            .setPartitionEpoch(requestPartitionEpoch)
+            .setNewIsr(newIsr.map(Int.box).asJava)
+            .setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value)
+          ).asJava)
         ).asJava)
-      ).asJava)
 
-    val future = new CompletableFuture[AlterPartitionResponseData]()
-    controller.eventManager.put(AlterPartitionReceived(
-      alterPartitionRequest,
-      AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
-      future.complete
-    ))
+      val future = new CompletableFuture[AlterPartitionResponseData]()
+      controller.eventManager.put(AlterPartitionReceived(
+        alterPartitionRequest,
+        AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
+        future.complete
+      ))
 
-    val expectedAlterPartitionResponse = new AlterPartitionResponseData()
-      .setTopics(Seq(new AlterPartitionResponseData.TopicData()
-        .setTopicId(topicId)
-        .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
-          .setPartitionIndex(tp.partition)
-          .setLeaderId(brokerId)
-          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
-          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
-          .setIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
-          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+      // When re-sending an ISR update, we should not get and error or any ISR 
changes
+      val expectedAlterPartitionResponse = new AlterPartitionResponseData()
+        .setTopics(Seq(new AlterPartitionResponseData.TopicData()
+          .setTopicId(topicId)
+          .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
+            .setPartitionIndex(tp.partition)
+            .setLeaderId(brokerId)
+            .setLeaderEpoch(oldLeaderAndIsr.leaderEpoch)
+            .setPartitionEpoch(newPartitionEpoch)
+            .setIsr(newIsr.map(Int.box).asJava)
+            .setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value)
+          ).asJava)
         ).asJava)
-      ).asJava)
+      assertEquals(expectedAlterPartitionResponse, future.get(10, 
TimeUnit.SECONDS))
+    }
 
-    assertEquals(expectedAlterPartitionResponse, future.get(10, 
TimeUnit.SECONDS))
+    // send a request, expect the partition epoch to be incremented
+    sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch)
+
+    // re-send the same request with various partition epochs 
(less/equal/greater than the current
+    // epoch), expect it to succeed while the partition epoch remains the same
+    sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch)
+    sendAndVerifyAlterPartitionResponse(newPartitionEpoch)
+    sendAndVerifyAlterPartitionResponse(newPartitionEpoch + 1)

Review Comment:
   Seems fairly reasonable to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to