This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 081deaa1a96 KAFKA-18486: Migrate ReplicaManagerTest to use applyDelta 
(#19954)
081deaa1a96 is described below

commit 081deaa1a96710f2d58e094cf92ba92478c6dcba
Author: Lan Ding <[email protected]>
AuthorDate: Sat Jun 14 11:40:51 2025 +0800

    KAFKA-18486: Migrate ReplicaManagerTest to use applyDelta (#19954)
    
    Change becomeLeaderOrFollower to applyDelta in following test cases
    
    - testReadCommittedFetchLimitedAtLSO
    - testReceiveOutOfOrderSequenceExceptionWithLogStartOffset
    - testRemoteFetchExpiresPerSecMetric
    - testRemoteLogReaderMetrics
    
    Reviewers: PoAn Yang <[email protected]>, Bolin Lin
    <[email protected]>, Yung <[email protected]>, Jimmy Wang
    <[email protected]>, Ken Huang
    <[email protected]>, TengYao Chi <[email protected]>
---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 45 +++++++---------------
 1 file changed, 13 insertions(+), 32 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 956659c565a..677acf87376 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -452,7 +452,7 @@ class ReplicaManagerTest {
       partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
       // Make this replica the leader.
-      val delta = createLeaderDelta(topicIds(topic), topicPartition, 
brokerList.get(0), brokerList, brokerList)
+      val delta = createLeaderDelta(topicId, topicPartition, 
brokerList.get(0), brokerList, brokerList)
       val leaderMetadataImage = imageFromTopics(delta.apply())
       rm.applyDelta(delta, leaderMetadataImage)
       rm.getPartitionOrException(new TopicPartition(topic, 0))
@@ -464,7 +464,7 @@ class ReplicaManagerTest {
       }
 
       // Make this replica the follower
-      val delta1 = createLeaderDelta(topicIds(topic), topicPartition, 
brokerList.get(1), brokerList, brokerList, 1)
+      val delta1 = createLeaderDelta(topicId, topicPartition, 
brokerList.get(1), brokerList, brokerList, 1)
       val followerMetadataImage = imageFromTopics(delta1.apply())
       rm.applyDelta(delta1, followerMetadataImage)
 
@@ -585,25 +585,16 @@ class ReplicaManagerTest {
     try {
       val brokerList = Seq[Integer](0, 1).asJava
 
-      val partition = replicaManager.createPartition(new TopicPartition(topic, 
0))
+      val tp = new TopicPartition(topic, 0)
+      val partition = replicaManager.createPartition(tp)
       partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
 
       // Make this replica the leader.
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(0)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(true)).asJava,
-        Collections.singletonMap(topic, topicId),
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => 
())
+      val delta = createLeaderDelta(topicId, tp, 0, brokerList, brokerList)
+      val leaderMetadataImage = imageFromTopics(delta.apply())
+
+      replicaManager.applyDelta(delta, leaderMetadataImage)
       replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
         .localLogOrException
 
@@ -802,20 +793,10 @@ class ReplicaManagerTest {
         new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
 
       // Make this replica the leader.
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(0)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(true)).asJava,
-        topicIds.asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => 
())
+      val delta = createLeaderDelta(topicId, new TopicPartition(topic, 0), 0, 
brokerList, brokerList)
+      val leaderMetadataImage = imageFromTopics(delta.apply())
+
+      replicaManager.applyDelta(delta, leaderMetadataImage)
       replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
         .localLogOrException
 
@@ -3974,7 +3955,7 @@ class ReplicaManagerTest {
       assertEquals(0, 
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
 
       val brokerList = Seq[Integer](0, 1).asJava
-      val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic, 
0), brokerList.get(1), brokerList, brokerList)
+      val delta = createLeaderDelta(topicId, new TopicPartition(topic, 0), 
brokerList.get(1), brokerList, brokerList)
       val leaderMetadataImage = imageFromTopics(delta.apply())
       replicaManager.applyDelta(delta, leaderMetadataImage)
 

Reply via email to