This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b38573fcaa4 KAFKA-18486 Remove becomeLeaderOrFollower from 
testPartition*, testPreferredReplicaAs* (#20009)
b38573fcaa4 is described below

commit b38573fcaa4f5f263636d5d727ef4aa3df17ec9f
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Mon Jun 23 16:42:30 2025 +0800

    KAFKA-18486 Remove becomeLeaderOrFollower from testPartition*, 
testPreferredReplicaAs* (#20009)
    
    Replace `leaderAndIsrRequest` and `becomeLeaderOrFollower` with
    `TopicsDelta`, `MetadataImage` and `ReplicaManager#applyDelta` for the
    following tests:
    * testPartitionListener
    * testPartitionMarkedOfflineIfLogCantBeCreated
    * testPartitionMetadataFileNotCreated
    * testPartitionsWithLateTransactionsCount
    * testPreferredReplicaAsFollower
    * testPreferredReplicaAsLeader
    * testPreferredReplicaAsLeaderWhenSameRackFollowerIsOutOfIsr
    * testProducerIdCountMetrics
    
    Reviewers: Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 215 ++++++++-------------
 1 file changed, 80 insertions(+), 135 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e3b37a8df2e..8e7c6181dd3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -624,35 +624,28 @@ class ReplicaManagerTest {
 
     try {
       val brokerList = Seq[Integer](0, 1).asJava
+      val tp0 = new TopicPartition(topic, 0)
+      val tp1 = new TopicPartition(topic, 1)
 
       // Create a couple partition for the topic.
-      val partition0 = replicaManager.createPartition(new 
TopicPartition(topic, 0))
+      val partition0 = replicaManager.createPartition(tp0)
       partition0.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
-      val partition1 = replicaManager.createPartition(new 
TopicPartition(topic, 1))
+      val partition1 = replicaManager.createPartition(tp1)
       partition1.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
 
       // Make this replica the leader for the partitions.
-      Seq(0, 1).foreach { partition =>
-        val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-          Seq(new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(topic)
-            .setPartitionIndex(partition)
-            .setControllerEpoch(0)
-            .setLeader(0)
-            .setLeaderEpoch(0)
-            .setIsr(brokerList)
-            .setPartitionEpoch(0)
-            .setReplicas(brokerList)
-            .setIsNew(true)).asJava,
-          Collections.singletonMap(topic, topicId),
-          Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava,
-          LeaderAndIsrRequest.Type.UNKNOWN
-        ).build()
-        replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) 
=> ())
-        replicaManager.getPartitionOrException(new TopicPartition(topic, 
partition))
-          .localLogOrException
+      Seq(tp0, tp1).foreach { tp =>
+        val delta = createLeaderDelta(
+          topicId = topicId,
+          partition = tp,
+          leaderId = 0,
+          replicas = brokerList,
+          isr = brokerList
+        )
+        replicaManager.applyDelta(delta, imageFromTopics(delta.apply()))
+        replicaManager.getPartitionOrException(tp)
       }
 
       def appendRecord(pid: Long, sequence: Int, partition: Int): Unit = {
@@ -721,20 +714,14 @@ class ReplicaManagerTest {
 
       // Make this replica the leader.
       val brokerList = Seq[Integer](0, 1, 2).asJava
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(0)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(true)).asJava,
-        topicIds.asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => 
())
+      val leaderDelta = createLeaderDelta(
+        topicId = topicId,
+        partition = topicPartition,
+        leaderId = 0,
+        replicas = brokerList,
+        isr = brokerList,
+      )
+      replicaManager.applyDelta(leaderDelta, 
imageFromTopics(leaderDelta.apply()))
 
       // Start a transaction
       val producerId = 234L
@@ -1355,26 +1342,19 @@ class ReplicaManagerTest {
       leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Optional.of(topicId))
 
     try {
-      val brokerList = Seq[Integer](0, 1).asJava
 
       val tp0 = new TopicPartition(topic, 0)
       val tidp0 = new TopicIdPartition(topicId, tp0)
 
       // Make this replica the follower
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(1)
-          .setLeaderEpoch(1)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(false)).asJava,
-        Collections.singletonMap(topic, topicId),
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => 
())
+      val followerDelta = createFollowerDelta(
+        topicId = topicId,
+        partition = tp0,
+        followerId = 0,
+        leaderId = 1,
+        leaderEpoch = 1,
+      )
+      replicaManager.applyDelta(followerDelta, 
imageFromTopics(followerDelta.apply()))
 
       val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", 
"client-id",
         InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, 
"default")
@@ -1413,21 +1393,19 @@ class ReplicaManagerTest {
       val tp0 = new TopicPartition(topic, 0)
       val tidp0 = new TopicIdPartition(topicId, tp0)
 
+      val partition = replicaManager.createPartition(tp0)
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+        new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
       // Make this replica the leader
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(1)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(false)).asJava,
-        Collections.singletonMap(topic, topicId),
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => 
())
+      val leaderDelta = createLeaderDelta(
+        topicId = topicId,
+        partition = tp0,
+        leaderId = 0,
+        replicas = brokerList,
+        isr = brokerList,
+        leaderEpoch = 1
+      )
+      replicaManager.applyDelta(leaderDelta, 
imageFromTopics(leaderDelta.apply()))
 
       val metadata = new DefaultClientMetadata("rack-a", "client-id",
         InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, 
"default")
@@ -1469,24 +1447,15 @@ class ReplicaManagerTest {
       ))
 
       // Make this replica the leader and remove follower from ISR.
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
-        0,
-        0,
-        brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(leaderBrokerId)
-          .setLeaderEpoch(1)
-          .setIsr(Seq[Integer](leaderBrokerId).asJava)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(false)).asJava,
-        Collections.singletonMap(topic, topicId),
-        Set(leaderNode, followerNode).asJava).build()
-
-      replicaManager.becomeLeaderOrFollower(2, leaderAndIsrRequest, (_, _) => 
())
+      val leaderDelta = createLeaderDelta(
+        topicId = topicId,
+        partition = tp0,
+        leaderId = leaderBrokerId,
+        replicas = brokerList,
+        isr = util.Arrays.asList(leaderBrokerId),
+        leaderEpoch = 1
+      )
+      replicaManager.applyDelta(leaderDelta, 
imageFromTopics(leaderDelta.apply()))
 
       appendRecords(replicaManager, tp0, 
TestUtils.singletonRecords(s"message".getBytes)).onFire { response =>
         assertEquals(Errors.NONE, response.error)
@@ -3953,43 +3922,25 @@ class ReplicaManagerTest {
   }
 
   @Test
-  def testPartitionMetadataFileNotCreated(): Unit = {
+  def testPartitionMetadataFileCreated(): Unit = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
     try {
       val brokerList = Seq[Integer](0, 1).asJava
       val topicPartition = new TopicPartition(topic, 0)
-      val topicPartitionFake = new TopicPartition("fakeTopic", 0)
-      val topicIds = Map(topic -> Uuid.ZERO_UUID, "foo" -> 
Uuid.randomUuid()).asJava
-      val topicNames = topicIds.asScala.map(_.swap).asJava
 
-      def leaderAndIsrRequest(epoch: Int, name: String): LeaderAndIsrRequest =
-        new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
-          Seq(new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(name)
-            .setPartitionIndex(0)
-            .setControllerEpoch(0)
-            .setLeader(0)
-            .setLeaderEpoch(epoch)
-            .setIsr(brokerList)
-            .setPartitionEpoch(0)
-            .setReplicas(brokerList)
-            .setIsNew(true)).asJava,
-          topicIds,
-          Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-
-      // There is no file if the topic does not have an associated topic ID.
-      val response = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(0, "fakeTopic"), (_, _) => ())
-      assertTrue(replicaManager.localLog(topicPartitionFake).isDefined)
-      val log = replicaManager.localLog(topicPartitionFake).get
-      assertFalse(log.partitionMetadataFile.get.exists())
-      assertEquals(Errors.NONE, 
response.partitionErrors(topicNames).get(topicPartition))
+      val leaderDelta = createLeaderDelta(
+        topicId = Uuid.ZERO_UUID,
+        partition = topicPartition,
+        leaderId = 0,
+        replicas = brokerList,
+        isr = brokerList,
+      )
 
-      // There is no file if the topic has the default UUID.
-      val response2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(0, topic), (_, _) => ())
+      // The file exists if the topic has the default UUID.
+      replicaManager.applyDelta(leaderDelta, 
imageFromTopics(leaderDelta.apply()))
       assertTrue(replicaManager.localLog(topicPartition).isDefined)
-      val log2 = replicaManager.localLog(topicPartition).get
-      assertFalse(log2.partitionMetadataFile.get.exists())
-      assertEquals(Errors.NONE, 
response2.partitionErrors(topicNames).get(topicPartition))
+      val log = replicaManager.localLog(topicPartition).get
+      assertTrue(log.partitionMetadataFile.get.exists())
 
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -4010,21 +3961,22 @@ class ReplicaManagerTest {
       // Delete the data directory to trigger a storage exception
       Utils.delete(dataDir)
 
-      val request = makeLeaderAndIsrRequest(
-        topicId = Uuid.randomUuid(),
-        topicPartition = topicPartition,
-        replicas = Seq(0, 1),
-        leaderAndIsr = new LeaderAndIsr(if (becomeLeader) 0 else 1, List(0, 
1).map(Int.box).asJava)
+      val leaderDelta = createLeaderDelta(
+        topicId = topicId,
+        partition = topicPartition,
+        leaderId = if (becomeLeader) 0 else 1,
+        replicas = util.Arrays.asList(0 , 1),
+        isr = util.Arrays.asList(0, 1),
       )
+      replicaManager.applyDelta(leaderDelta, 
imageFromTopics(leaderDelta.apply()))
 
-      replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
       val hostedPartition = replicaManager.getPartition(topicPartition)
       assertEquals(
         classOf[HostedPartition.Offline],
         hostedPartition.getClass
       )
       assertEquals(
-        request.topicIds().get(topicPartition.topic()),
+        topicId,
         
hostedPartition.asInstanceOf[HostedPartition.Offline].partition.flatMap(p => 
p.topicId).get
       )
     } finally {
@@ -5276,22 +5228,15 @@ class ReplicaManagerTest {
       assertFalse(replicaManager.maybeAddListener(tp, listener))
 
       // Broker 0 becomes leader of the partition
-      val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState()
-        .setTopicName(topic)
-        .setPartitionIndex(0)
-        .setControllerEpoch(0)
-        .setLeader(0)
-        .setLeaderEpoch(leaderEpoch)
-        .setIsr(replicas)
-        .setPartitionEpoch(0)
-        .setReplicas(replicas)
-        .setIsNew(true)
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(leaderAndIsrPartitionState).asJava,
-        Collections.singletonMap(topic, topicId),
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest, (_, _) => ())
-      assertEquals(Errors.NONE, leaderAndIsrResponse.error)
+      val leaderDelta = createLeaderDelta(
+        topicId = topicId,
+        partition = tp,
+        leaderId = 0,
+        replicas = replicas,
+        isr = replicas,
+        leaderEpoch = leaderEpoch
+      )
+      replicaManager.applyDelta(leaderDelta, 
imageFromTopics(leaderDelta.apply()))
 
       // Registering it should succeed now.
       assertTrue(replicaManager.maybeAddListener(tp, listener))

Reply via email to