hachikuji commented on code in PR #12489:
URL: https://github.com/apache/kafka/pull/12489#discussion_r940721941
##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1002,47 +1002,59 @@ class ControllerIntegrationTest extends
QuorumTestHarness {
val controller = getController().kafkaController
val leaderIsrAndControllerEpochMap =
zkClient.getTopicPartitionStates(Seq(tp))
- val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+ val oldLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+ val newIsr = List(oldLeaderAndIsr.leader)
+ val newPartitionEpoch = oldLeaderAndIsr.partitionEpoch + 1
val topicId = controller.controllerContext.topicIds(tp.topic)
val brokerId = otherBroker.config.brokerId
val brokerEpoch =
controller.controllerContext.liveBrokerIdAndEpochs(otherBroker.config.brokerId)
- // When re-sending the current ISR, we should not get and error or any ISR
changes
- val alterPartitionRequest = new AlterPartitionRequestData()
- .setBrokerId(brokerId)
- .setBrokerEpoch(brokerEpoch)
- .setTopics(Seq(new AlterPartitionRequestData.TopicData()
- .setTopicId(topicId)
- .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
- .setPartitionIndex(tp.partition)
- .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
- .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
- .setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
- .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+ def sendAndVerifyAlterPartitionResponse(requestPartitionEpoch: Int): Unit
= {
+ val alterPartitionRequest = new AlterPartitionRequestData()
+ .setBrokerId(brokerId)
+ .setBrokerEpoch(brokerEpoch)
+ .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+ .setTopicId(topicId)
+ .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+ .setPartitionIndex(tp.partition)
+ .setLeaderEpoch(oldLeaderAndIsr.leaderEpoch)
+ .setPartitionEpoch(requestPartitionEpoch)
+ .setNewIsr(newIsr.map(Int.box).asJava)
+ .setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value)
+ ).asJava)
).asJava)
- ).asJava)
- val future = new CompletableFuture[AlterPartitionResponseData]()
- controller.eventManager.put(AlterPartitionReceived(
- alterPartitionRequest,
- AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
- future.complete
- ))
+ val future = new CompletableFuture[AlterPartitionResponseData]()
+ controller.eventManager.put(AlterPartitionReceived(
+ alterPartitionRequest,
+ AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
+ future.complete
+ ))
- val expectedAlterPartitionResponse = new AlterPartitionResponseData()
- .setTopics(Seq(new AlterPartitionResponseData.TopicData()
- .setTopicId(topicId)
- .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
- .setPartitionIndex(tp.partition)
- .setLeaderId(brokerId)
- .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
- .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
- .setIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
- .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+ // When re-sending an ISR update, we should not get and error or any ISR
changes
+ val expectedAlterPartitionResponse = new AlterPartitionResponseData()
+ .setTopics(Seq(new AlterPartitionResponseData.TopicData()
+ .setTopicId(topicId)
+ .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+ .setLeaderId(brokerId)
+ .setLeaderEpoch(oldLeaderAndIsr.leaderEpoch)
+ .setPartitionEpoch(newPartitionEpoch)
+ .setIsr(newIsr.map(Int.box).asJava)
+ .setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value)
+ ).asJava)
).asJava)
- ).asJava)
+ assertEquals(expectedAlterPartitionResponse, future.get(10,
TimeUnit.SECONDS))
+ }
- assertEquals(expectedAlterPartitionResponse, future.get(10,
TimeUnit.SECONDS))
+ // send a request, expect the partition epoch to be incremented
+ sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch)
+
+ // re-send the same request with various partition epochs
(less/equal/greater than the current
+ // epoch), expect it to succeed while the partition epoch remains the same
+ sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch)
+ sendAndVerifyAlterPartitionResponse(newPartitionEpoch)
+ sendAndVerifyAlterPartitionResponse(newPartitionEpoch + 1)
Review Comment:
I do find it a little odd that the partition epoch is ignored completely
when the ISR matches the desired state. We do have the leader epoch check, so
at least we can ensure that an old leader won't be mislead into thinking that
its change was successfully applied. How about a case when the request is sent
to an old controller? Suppose a scenario like this:
1. Controller A has isr=[1,2], partition epoch=10
2. Controller B is elected
3. Leader sends AlterPartition(epoch=10) to B to remove 2 from ISR =>
partition epoch = 11
4. Leader sends AlterPartition(epoch=11) to A to add 2 back to the ISR => A
accepts, but there is no bump
I think this case is ruled out because the leader has to find the new
controller and then revert back. The controller epoch probably would catch that
case. What if we add a restart between steps 3 and 4? Would it be possible to
find the old controller after restarting? Probably not, but I think I'd sleep
better if we could at least reject requests where the partition epoch is
greater than what the controller has in its cache. Does that make sense?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]