This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b38573fcaa4 KAFKA-18486 Remove becomeLeaderOrFollower from
testPartition*, testPreferredReplicaAs* (#20009)
b38573fcaa4 is described below
commit b38573fcaa4f5f263636d5d727ef4aa3df17ec9f
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Mon Jun 23 16:42:30 2025 +0800
KAFKA-18486 Remove becomeLeaderOrFollower from testPartition*,
testPreferredReplicaAs* (#20009)
Replace `leaderAndIsrRequest` and `becomeLeaderOrFollower` with
`TopicsDelta`, `MetadataImage` and `ReplicaManager#applyDelta` for the
following tests:
* testPartitionListener
* testPartitionMarkedOfflineIfLogCantBeCreated
* testPartitionMetadataFileNotCreated
* testPartitionsWithLateTransactionsCount
* testPreferredReplicaAsFollower
* testPreferredReplicaAsLeader
* testPreferredReplicaAsLeaderWhenSameRackFollowerIsOutOfIsr
* testProducerIdCountMetrics
Reviewers: Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../unit/kafka/server/ReplicaManagerTest.scala | 215 ++++++++-------------
1 file changed, 80 insertions(+), 135 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e3b37a8df2e..8e7c6181dd3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -624,35 +624,28 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1).asJava
+ val tp0 = new TopicPartition(topic, 0)
+ val tp1 = new TopicPartition(topic, 1)
// Create a couple partition for the topic.
- val partition0 = replicaManager.createPartition(new
TopicPartition(topic, 0))
+ val partition0 = replicaManager.createPartition(tp0)
partition0.createLogIfNotExists(isNew = false, isFutureReplica = false,
new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
- val partition1 = replicaManager.createPartition(new
TopicPartition(topic, 1))
+ val partition1 = replicaManager.createPartition(tp1)
partition1.createLogIfNotExists(isNew = false, isFutureReplica = false,
new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader for the partitions.
- Seq(0, 1).foreach { partition =>
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(partition)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(0)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(true)).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava,
- LeaderAndIsrRequest.Type.UNKNOWN
- ).build()
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _)
=> ())
- replicaManager.getPartitionOrException(new TopicPartition(topic,
partition))
- .localLogOrException
+ Seq(tp0, tp1).foreach { tp =>
+ val delta = createLeaderDelta(
+ topicId = topicId,
+ partition = tp,
+ leaderId = 0,
+ replicas = brokerList,
+ isr = brokerList
+ )
+ replicaManager.applyDelta(delta, imageFromTopics(delta.apply()))
+ replicaManager.getPartitionOrException(tp)
}
def appendRecord(pid: Long, sequence: Int, partition: Int): Unit = {
@@ -721,20 +714,14 @@ class ReplicaManagerTest {
// Make this replica the leader.
val brokerList = Seq[Integer](0, 1, 2).asJava
- val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(0)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(true)).asJava,
- topicIds.asJava,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) =>
())
+ val leaderDelta = createLeaderDelta(
+ topicId = topicId,
+ partition = topicPartition,
+ leaderId = 0,
+ replicas = brokerList,
+ isr = brokerList,
+ )
+ replicaManager.applyDelta(leaderDelta,
imageFromTopics(leaderDelta.apply()))
// Start a transaction
val producerId = 234L
@@ -1355,26 +1342,19 @@ class ReplicaManagerTest {
leaderBrokerId, countDownLatch, expectTruncation = true, topicId =
Optional.of(topicId))
try {
- val brokerList = Seq[Integer](0, 1).asJava
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)
// Make this replica the follower
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(1)
- .setLeaderEpoch(1)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(false)).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) =>
())
+ val followerDelta = createFollowerDelta(
+ topicId = topicId,
+ partition = tp0,
+ followerId = 0,
+ leaderId = 1,
+ leaderEpoch = 1,
+ )
+ replicaManager.applyDelta(followerDelta,
imageFromTopics(followerDelta.apply()))
val metadata: ClientMetadata = new DefaultClientMetadata("rack-a",
"client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS,
"default")
@@ -1413,21 +1393,19 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)
+ val partition = replicaManager.createPartition(tp0)
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+ new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(1)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(false)).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) =>
())
+ val leaderDelta = createLeaderDelta(
+ topicId = topicId,
+ partition = tp0,
+ leaderId = 0,
+ replicas = brokerList,
+ isr = brokerList,
+ leaderEpoch = 1
+ )
+ replicaManager.applyDelta(leaderDelta,
imageFromTopics(leaderDelta.apply()))
val metadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS,
"default")
@@ -1469,24 +1447,15 @@ class ReplicaManagerTest {
))
// Make this replica the leader and remove follower from ISR.
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
- 0,
- 0,
- brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(leaderBrokerId)
- .setLeaderEpoch(1)
- .setIsr(Seq[Integer](leaderBrokerId).asJava)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(false)).asJava,
- Collections.singletonMap(topic, topicId),
- Set(leaderNode, followerNode).asJava).build()
-
- replicaManager.becomeLeaderOrFollower(2, leaderAndIsrRequest, (_, _) =>
())
+ val leaderDelta = createLeaderDelta(
+ topicId = topicId,
+ partition = tp0,
+ leaderId = leaderBrokerId,
+ replicas = brokerList,
+ isr = util.Arrays.asList(leaderBrokerId),
+ leaderEpoch = 1
+ )
+ replicaManager.applyDelta(leaderDelta,
imageFromTopics(leaderDelta.apply()))
appendRecords(replicaManager, tp0,
TestUtils.singletonRecords(s"message".getBytes)).onFire { response =>
assertEquals(Errors.NONE, response.error)
@@ -3953,43 +3922,25 @@ class ReplicaManagerTest {
}
@Test
- def testPartitionMetadataFileNotCreated(): Unit = {
+ def testPartitionMetadataFileCreated(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time))
try {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)
- val topicPartitionFake = new TopicPartition("fakeTopic", 0)
- val topicIds = Map(topic -> Uuid.ZERO_UUID, "foo" ->
Uuid.randomUuid()).asJava
- val topicNames = topicIds.asScala.map(_.swap).asJava
- def leaderAndIsrRequest(epoch: Int, name: String): LeaderAndIsrRequest =
- new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(name)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(epoch)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(true)).asJava,
- topicIds,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-
- // There is no file if the topic does not have an associated topic ID.
- val response = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(0, "fakeTopic"), (_, _) => ())
- assertTrue(replicaManager.localLog(topicPartitionFake).isDefined)
- val log = replicaManager.localLog(topicPartitionFake).get
- assertFalse(log.partitionMetadataFile.get.exists())
- assertEquals(Errors.NONE,
response.partitionErrors(topicNames).get(topicPartition))
+ val leaderDelta = createLeaderDelta(
+ topicId = Uuid.ZERO_UUID,
+ partition = topicPartition,
+ leaderId = 0,
+ replicas = brokerList,
+ isr = brokerList,
+ )
- // There is no file if the topic has the default UUID.
- val response2 = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(0, topic), (_, _) => ())
+ // The file exists if the topic has the default UUID.
+ replicaManager.applyDelta(leaderDelta,
imageFromTopics(leaderDelta.apply()))
assertTrue(replicaManager.localLog(topicPartition).isDefined)
- val log2 = replicaManager.localLog(topicPartition).get
- assertFalse(log2.partitionMetadataFile.get.exists())
- assertEquals(Errors.NONE,
response2.partitionErrors(topicNames).get(topicPartition))
+ val log = replicaManager.localLog(topicPartition).get
+ assertTrue(log.partitionMetadataFile.get.exists())
} finally {
replicaManager.shutdown(checkpointHW = false)
@@ -4010,21 +3961,22 @@ class ReplicaManagerTest {
// Delete the data directory to trigger a storage exception
Utils.delete(dataDir)
- val request = makeLeaderAndIsrRequest(
- topicId = Uuid.randomUuid(),
- topicPartition = topicPartition,
- replicas = Seq(0, 1),
- leaderAndIsr = new LeaderAndIsr(if (becomeLeader) 0 else 1, List(0,
1).map(Int.box).asJava)
+ val leaderDelta = createLeaderDelta(
+ topicId = topicId,
+ partition = topicPartition,
+ leaderId = if (becomeLeader) 0 else 1,
+ replicas = util.Arrays.asList(0 , 1),
+ isr = util.Arrays.asList(0, 1),
)
+ replicaManager.applyDelta(leaderDelta,
imageFromTopics(leaderDelta.apply()))
- replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
val hostedPartition = replicaManager.getPartition(topicPartition)
assertEquals(
classOf[HostedPartition.Offline],
hostedPartition.getClass
)
assertEquals(
- request.topicIds().get(topicPartition.topic()),
+ topicId,
hostedPartition.asInstanceOf[HostedPartition.Offline].partition.flatMap(p =>
p.topicId).get
)
} finally {
@@ -5276,22 +5228,15 @@ class ReplicaManagerTest {
assertFalse(replicaManager.maybeAddListener(tp, listener))
// Broker 0 becomes leader of the partition
- val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(leaderEpoch)
- .setIsr(replicas)
- .setPartitionEpoch(0)
- .setReplicas(replicas)
- .setIsNew(true)
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(leaderAndIsrPartitionState).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest, (_, _) => ())
- assertEquals(Errors.NONE, leaderAndIsrResponse.error)
+ val leaderDelta = createLeaderDelta(
+ topicId = topicId,
+ partition = tp,
+ leaderId = 0,
+ replicas = replicas,
+ isr = replicas,
+ leaderEpoch = leaderEpoch
+ )
+ replicaManager.applyDelta(leaderDelta,
imageFromTopics(leaderDelta.apply()))
// Registering it should succeed now.
assertTrue(replicaManager.maybeAddListener(tp, listener))