This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8a7e4a14234 KAFKA-18486 Update activeProducerState wih KRaft mechanism 
in ReplicaManagerTest (#19890)
8a7e4a14234 is described below

commit 8a7e4a14234ba63b94399ff296c981a8690cd710
Author: Bolin Lin <[email protected]>
AuthorDate: Sun Jun 8 00:26:52 2025 +0800

    KAFKA-18486 Update activeProducerState wih KRaft mechanism in 
ReplicaManagerTest (#19890)
    
    Description:
    * replace RPC with KRaft mechanism to test activeProducerState in
    ReplicaManagerTest
    
    Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 77 +++++++++++++++++-----
 1 file changed, 60 insertions(+), 17 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index cdae51bb935..9077f664eff 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -4535,35 +4535,78 @@ class ReplicaManagerTest {
       val oofProducerState = replicaManager.activeProducerState(oofPartition)
       assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, 
Errors.forCode(oofProducerState.errorCode))
 
-      // This API is supported by both leaders and followers
-
       val barPartition = new TopicPartition("bar", 0)
-      val barLeaderAndIsrRequest = makeLeaderAndIsrRequest(
-        topicId = Uuid.randomUuid(),
-        topicPartition = barPartition,
-        replicas = Seq(brokerId),
-        leaderAndIsr = new LeaderAndIsr(brokerId, 
List(brokerId).map(Int.box).asJava)
-      )
-      replicaManager.becomeLeaderOrFollower(0, barLeaderAndIsrRequest, (_, _) 
=> ())
+      val barTopicId = Uuid.randomUuid()
+
+      val leaderDelta = createLeaderDelta(barTopicId, barPartition, brokerId)
+      val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
+
       val barProducerState = replicaManager.activeProducerState(barPartition)
       assertEquals(Errors.NONE, Errors.forCode(barProducerState.errorCode))
 
-      val otherBrokerId = 1
       val bazPartition = new TopicPartition("baz", 0)
-      val bazLeaderAndIsrRequest = makeLeaderAndIsrRequest(
-        topicId = Uuid.randomUuid(),
-        topicPartition = bazPartition,
-        replicas = Seq(brokerId, otherBrokerId),
-        leaderAndIsr = new LeaderAndIsr(otherBrokerId, List(brokerId, 
otherBrokerId).map(Int.box).asJava)
-      )
-      replicaManager.becomeLeaderOrFollower(0, bazLeaderAndIsrRequest, (_, _) 
=> ())
+      val bazTopicId = Uuid.randomUuid()
+      val otherBrokerId = 1
+
+      val followerDelta = createFollowerDelta(bazTopicId, bazPartition, 
brokerId, otherBrokerId)
+      val followerMetadataImage = imageFromTopics(followerDelta.apply())
+      replicaManager.applyDelta(followerDelta, followerMetadataImage)
+
       val bazProducerState = replicaManager.activeProducerState(bazPartition)
       assertEquals(Errors.NONE, Errors.forCode(bazProducerState.errorCode))
+
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
   }
 
+  private def createLeaderDelta(topicId: Uuid, partition: TopicPartition, 
leaderId: Int): TopicsDelta = {
+    val delta = new TopicsDelta(TopicsImage.EMPTY)
+    
+    delta.replay(new TopicRecord()
+      .setName(partition.topic)
+      .setTopicId(topicId)
+    )
+
+    delta.replay(new PartitionRecord()
+      .setPartitionId(partition.partition)
+      .setTopicId(topicId)
+      .setReplicas(util.Arrays.asList(leaderId))
+      .setIsr(util.Arrays.asList(leaderId))
+      .setRemovingReplicas(Collections.emptyList())
+      .setAddingReplicas(Collections.emptyList())
+      .setLeader(leaderId)
+      .setLeaderEpoch(0)
+      .setPartitionEpoch(0)
+    )
+
+    delta
+  }
+
+  private def createFollowerDelta(topicId: Uuid, partition: TopicPartition, 
followerId: Int, leaderId: Int): TopicsDelta = {
+    val delta = new TopicsDelta(TopicsImage.EMPTY)
+
+    delta.replay(new TopicRecord()
+      .setName(partition.topic)
+      .setTopicId(topicId)
+    )
+
+    delta.replay(new PartitionRecord()
+      .setPartitionId(partition.partition)
+      .setTopicId(topicId)
+      .setReplicas(util.Arrays.asList(followerId, leaderId))
+      .setIsr(util.Arrays.asList(followerId, leaderId))
+      .setRemovingReplicas(Collections.emptyList())
+      .setAddingReplicas(Collections.emptyList())
+      .setLeader(leaderId)
+      .setLeaderEpoch(0)
+      .setPartitionEpoch(0)
+    )
+
+    delta
+  }
+
   val FOO_UUID = Uuid.fromString("fFJBx0OmQG-UqeaT6YaSwA")
 
   val BAR_UUID = Uuid.fromString("vApAP6y7Qx23VOfKBzbOBQ")

Reply via email to