songnon commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r961648621
##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,58 @@ class ControllerIntegrationTest extends QuorumTestHarness {
"failed to get expected partition state upon broker startup")
}
+ @Test
+ def testAutoPreferredReplicaLeaderElectionWithOtherReassigningPartitions():
Unit = {
+ servers = makeServers(3, autoLeaderRebalanceEnable = true)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+ val leaderBrokerId = servers.map(_.config.brokerId).filter(_ !=
controllerId).head
+ val otherBrokerId = servers.map(_.config.brokerId).filter(e => e !=
controllerId && e != leaderBrokerId).head
+ val tp = new TopicPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
+ val reassigningTp = new TopicPartition("reassigning", 0)
+ val reassigningTpAssignment = Map(reassigningTp.partition ->
Seq(controllerId))
+
+ TestUtils.createTopic(zkClient, reassigningTp.topic,
partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+ servers(leaderBrokerId).shutdown()
+ servers(leaderBrokerId).awaitShutdown()
+
+ servers(otherBrokerId).shutdown()
+ servers(otherBrokerId).awaitShutdown()
+ waitForPartitionState(tp, firstControllerEpoch, controllerId,
LeaderAndIsr.InitialLeaderEpoch + 1,
+ "failed to get expected partition state upon broker shutdown")
+
+ val reassignment = Map(reassigningTp -> Seq(otherBrokerId))
+ zkClient.createPartitionReassignment(reassignment)
+ waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId,
LeaderAndIsr.InitialLeaderEpoch + 1,
+ "failed to get expected partition state during partition reassignment
with offline replica")
+
+ servers(leaderBrokerId).startup()
+ waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId,
LeaderAndIsr.InitialLeaderEpoch + 2,
+ "failed to get expected partition state upon broker startup")
+
+ servers(otherBrokerId).startup()
+ waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId,
LeaderAndIsr.InitialLeaderEpoch + 4,
+ "failed to get expected partition state after partition reassignment")
+ }
+
+ @Test
+ def
testAutoPreferredReplicaLeaderElectionWithSamePartitionBeingReassigned(): Unit
= {
+ servers = makeServers(3, autoLeaderRebalanceEnable = true)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+ val leaderBrokerId = servers.map(_.config.brokerId).filter(_ !=
controllerId).head
+ val otherBrokerId = servers.map(_.config.brokerId).filter(e => e !=
controllerId && e != leaderBrokerId).head
+ val tp = new TopicPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
+ val reassignment = Map(tp -> Seq(otherBrokerId, leaderBrokerId))
+ zkClient.createPartitionReassignment(reassignment)
+ waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId,
LeaderAndIsr.InitialLeaderEpoch + 3,
+ "failed to get expected partition state after partition reassignment")
+ waitForPartitionState(tp, firstControllerEpoch, otherBrokerId,
LeaderAndIsr.InitialLeaderEpoch + 4,
+ "failed to get expected partition state after auto preferred replica
leader election")
Review Comment:
hmm, that's a good point. Updated the test case to make sure election is
taken place while the reassignment is active.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]