This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 081deaa1a96 KAFKA-18486: Migrate ReplicaManagerTest to use applyDelta
(#19954)
081deaa1a96 is described below
commit 081deaa1a96710f2d58e094cf92ba92478c6dcba
Author: Lan Ding <[email protected]>
AuthorDate: Sat Jun 14 11:40:51 2025 +0800
KAFKA-18486: Migrate ReplicaManagerTest to use applyDelta (#19954)
Change becomeLeaderOrFollower to applyDelta in following test cases
- testReadCommittedFetchLimitedAtLSO
- testReceiveOutOfOrderSequenceExceptionWithLogStartOffset
- testRemoteFetchExpiresPerSecMetric
- testRemoteLogReaderMetrics
Reviewers: PoAn Yang <[email protected]>, Bolin Lin
<[email protected]>, Yung <[email protected]>, Jimmy Wang
<[email protected]>, Ken Huang
<[email protected]>, TengYao Chi <[email protected]>
---
.../unit/kafka/server/ReplicaManagerTest.scala | 45 +++++++---------------
1 file changed, 13 insertions(+), 32 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 956659c565a..677acf87376 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -452,7 +452,7 @@ class ReplicaManagerTest {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader.
- val delta = createLeaderDelta(topicIds(topic), topicPartition,
brokerList.get(0), brokerList, brokerList)
+ val delta = createLeaderDelta(topicId, topicPartition,
brokerList.get(0), brokerList, brokerList)
val leaderMetadataImage = imageFromTopics(delta.apply())
rm.applyDelta(delta, leaderMetadataImage)
rm.getPartitionOrException(new TopicPartition(topic, 0))
@@ -464,7 +464,7 @@ class ReplicaManagerTest {
}
// Make this replica the follower
- val delta1 = createLeaderDelta(topicIds(topic), topicPartition,
brokerList.get(1), brokerList, brokerList, 1)
+ val delta1 = createLeaderDelta(topicId, topicPartition,
brokerList.get(1), brokerList, brokerList, 1)
val followerMetadataImage = imageFromTopics(delta1.apply())
rm.applyDelta(delta1, followerMetadataImage)
@@ -585,25 +585,16 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1).asJava
- val partition = replicaManager.createPartition(new TopicPartition(topic,
0))
+ val tp = new TopicPartition(topic, 0)
+ val partition = replicaManager.createPartition(tp)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader.
- val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(0)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(true)).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) =>
())
+ val delta = createLeaderDelta(topicId, tp, 0, brokerList, brokerList)
+ val leaderMetadataImage = imageFromTopics(delta.apply())
+
+ replicaManager.applyDelta(delta, leaderMetadataImage)
replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException
@@ -802,20 +793,10 @@ class ReplicaManagerTest {
new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader.
- val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(0)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(true)).asJava,
- topicIds.asJava,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) =>
())
+ val delta = createLeaderDelta(topicId, new TopicPartition(topic, 0), 0,
brokerList, brokerList)
+ val leaderMetadataImage = imageFromTopics(delta.apply())
+
+ replicaManager.applyDelta(delta, leaderMetadataImage)
replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException
@@ -3974,7 +3955,7 @@ class ReplicaManagerTest {
assertEquals(0,
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
val brokerList = Seq[Integer](0, 1).asJava
- val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic,
0), brokerList.get(1), brokerList, brokerList)
+ val delta = createLeaderDelta(topicId, new TopicPartition(topic, 0),
brokerList.get(1), brokerList, brokerList)
val leaderMetadataImage = imageFromTopics(delta.apply())
replicaManager.applyDelta(delta, leaderMetadataImage)