This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8a7e4a14234 KAFKA-18486 Update activeProducerState wih KRaft mechanism
in ReplicaManagerTest (#19890)
8a7e4a14234 is described below
commit 8a7e4a14234ba63b94399ff296c981a8690cd710
Author: Bolin Lin <[email protected]>
AuthorDate: Sun Jun 8 00:26:52 2025 +0800
KAFKA-18486 Update activeProducerState wih KRaft mechanism in
ReplicaManagerTest (#19890)
Description:
* replace RPC with KRaft mechanism to test activeProducerState in
ReplicaManagerTest
Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../unit/kafka/server/ReplicaManagerTest.scala | 77 +++++++++++++++++-----
1 file changed, 60 insertions(+), 17 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index cdae51bb935..9077f664eff 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -4535,35 +4535,78 @@ class ReplicaManagerTest {
val oofProducerState = replicaManager.activeProducerState(oofPartition)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER,
Errors.forCode(oofProducerState.errorCode))
- // This API is supported by both leaders and followers
-
val barPartition = new TopicPartition("bar", 0)
- val barLeaderAndIsrRequest = makeLeaderAndIsrRequest(
- topicId = Uuid.randomUuid(),
- topicPartition = barPartition,
- replicas = Seq(brokerId),
- leaderAndIsr = new LeaderAndIsr(brokerId,
List(brokerId).map(Int.box).asJava)
- )
- replicaManager.becomeLeaderOrFollower(0, barLeaderAndIsrRequest, (_, _)
=> ())
+ val barTopicId = Uuid.randomUuid()
+
+ val leaderDelta = createLeaderDelta(barTopicId, barPartition, brokerId)
+ val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
+
val barProducerState = replicaManager.activeProducerState(barPartition)
assertEquals(Errors.NONE, Errors.forCode(barProducerState.errorCode))
- val otherBrokerId = 1
val bazPartition = new TopicPartition("baz", 0)
- val bazLeaderAndIsrRequest = makeLeaderAndIsrRequest(
- topicId = Uuid.randomUuid(),
- topicPartition = bazPartition,
- replicas = Seq(brokerId, otherBrokerId),
- leaderAndIsr = new LeaderAndIsr(otherBrokerId, List(brokerId,
otherBrokerId).map(Int.box).asJava)
- )
- replicaManager.becomeLeaderOrFollower(0, bazLeaderAndIsrRequest, (_, _)
=> ())
+ val bazTopicId = Uuid.randomUuid()
+ val otherBrokerId = 1
+
+ val followerDelta = createFollowerDelta(bazTopicId, bazPartition,
brokerId, otherBrokerId)
+ val followerMetadataImage = imageFromTopics(followerDelta.apply())
+ replicaManager.applyDelta(followerDelta, followerMetadataImage)
+
val bazProducerState = replicaManager.activeProducerState(bazPartition)
assertEquals(Errors.NONE, Errors.forCode(bazProducerState.errorCode))
+
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
+ private def createLeaderDelta(topicId: Uuid, partition: TopicPartition,
leaderId: Int): TopicsDelta = {
+ val delta = new TopicsDelta(TopicsImage.EMPTY)
+
+ delta.replay(new TopicRecord()
+ .setName(partition.topic)
+ .setTopicId(topicId)
+ )
+
+ delta.replay(new PartitionRecord()
+ .setPartitionId(partition.partition)
+ .setTopicId(topicId)
+ .setReplicas(util.Arrays.asList(leaderId))
+ .setIsr(util.Arrays.asList(leaderId))
+ .setRemovingReplicas(Collections.emptyList())
+ .setAddingReplicas(Collections.emptyList())
+ .setLeader(leaderId)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(0)
+ )
+
+ delta
+ }
+
+ private def createFollowerDelta(topicId: Uuid, partition: TopicPartition,
followerId: Int, leaderId: Int): TopicsDelta = {
+ val delta = new TopicsDelta(TopicsImage.EMPTY)
+
+ delta.replay(new TopicRecord()
+ .setName(partition.topic)
+ .setTopicId(topicId)
+ )
+
+ delta.replay(new PartitionRecord()
+ .setPartitionId(partition.partition)
+ .setTopicId(topicId)
+ .setReplicas(util.Arrays.asList(followerId, leaderId))
+ .setIsr(util.Arrays.asList(followerId, leaderId))
+ .setRemovingReplicas(Collections.emptyList())
+ .setAddingReplicas(Collections.emptyList())
+ .setLeader(leaderId)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(0)
+ )
+
+ delta
+ }
+
val FOO_UUID = Uuid.fromString("fFJBx0OmQG-UqeaT6YaSwA")
val BAR_UUID = Uuid.fromString("vApAP6y7Qx23VOfKBzbOBQ")