This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7c715c02c06 KAFKA-18486 Update testClearPurgatoryOnBecomingFollower
etc with KRaft mechanism in ReplicaManagerTest (#19924)
7c715c02c06 is described below
commit 7c715c02c06f16475faff8aa72048cddb7382c8a
Author: Ken Huang <[email protected]>
AuthorDate: Wed Jun 11 18:52:08 2025 +0800
KAFKA-18486 Update testClearPurgatoryOnBecomingFollower etc with KRaft
mechanism in ReplicaManagerTest (#19924)
update the following test to avoid using `becomeLeaderOrFollower`
- testClearPurgatoryOnBecomingFollower
- testDelayedFetchIncludesAbortedTransactions
- testDisabledTransactionVerification
- testFailedBuildRemoteLogAuxStateMetrics
Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../unit/kafka/server/ReplicaManagerTest.scala | 81 +++++-----------------
1 file changed, 17 insertions(+), 64 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 04a1677b474..ee2115c635f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -445,26 +445,15 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1).asJava
- val topicIds = Collections.singletonMap(topic, topicId)
- val partition = rm.createPartition(new TopicPartition(topic, 0))
+ val topicPartition = new TopicPartition(topic, 0)
+ val partition = rm.createPartition(topicPartition)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader.
- val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(0)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(false)).asJava,
- topicIds,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
+ val delta = createLeaderDelta(topicIds(topic), topicPartition,
brokerList.get(0), brokerList, brokerList)
+ val leaderMetadataImage = imageFromTopics(delta.apply())
+ rm.applyDelta(delta, leaderMetadataImage)
rm.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException
@@ -474,20 +463,9 @@ class ReplicaManagerTest {
}
// Make this replica the follower
- val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(1)
- .setLeaderEpoch(1)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(false)).asJava,
- topicIds,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
+ val delta1 = createLeaderDelta(topicIds(topic), topicPartition,
brokerList.get(1), brokerList, brokerList, 1)
+ val followerMetadataImage = imageFromTopics(delta1.apply())
+ rm.applyDelta(delta1, followerMetadataImage)
assertTrue(appendResult.hasFired)
} finally {
@@ -945,20 +923,9 @@ class ReplicaManagerTest {
new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader.
- val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(0)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(true)).asJava,
- topicIds.asJava,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) =>
())
+ val delta = topicsCreateDelta(brokerList.get(0), isStartIdLeader = true,
partitions = List(0), List.empty, topic, topicIds(topic))
+ val leaderMetadataImage = imageFromTopics(delta.apply())
+ replicaManager.applyDelta(delta, leaderMetadataImage)
replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException
@@ -2548,8 +2515,9 @@ class ReplicaManagerTest {
val replicaManager =
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
List(tp), config = config)
try {
- val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic),
tp, Seq(0, 1), new LeaderAndIsr(0, List(0, 1).map(Int.box).asJava))
- replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) =>
())
+ val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions =
List(0), List.empty, topic, topicIds(topic))
+ val leaderMetadataImage = imageFromTopics(delta.apply())
+ replicaManager.applyDelta(delta, leaderMetadataImage)
val transactionalRecords =
MemoryRecords.withTransactionalRecords(Compression.NONE, producerId,
producerEpoch, sequence,
new SimpleRecord(s"message $sequence".getBytes))
@@ -4110,23 +4078,6 @@ class ReplicaManagerTest {
try {
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
- val partition0Replicas = Seq[Integer](0, 1).asJava
- val topicIds = Map(tp0.topic -> topicId).asJava
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(0)
- .setLeader(1)
- .setLeaderEpoch(0)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true)
- ).asJava,
- topicIds,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
// Verify the metrics for build remote log state and for failures is
zero before replicas start to fetch
assertEquals(0,
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
@@ -4135,7 +4086,9 @@ class ReplicaManagerTest {
assertEquals(0,
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
assertEquals(0,
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) =>
())
+ val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic,
0), 1, util.List.of(0, 1), util.List.of(0, 1))
+ val leaderMetadataImage = imageFromTopics(delta.apply())
+ replicaManager.applyDelta(delta, leaderMetadataImage)
// Replicas fetch from the leader periodically, therefore we check that
the metric value is increasing
// We expect failedBuildRemoteLogAuxStateRate to increase because there
is no remoteLogSegmentMetadata