This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2a7457f2dd9 KAFKA-18486 Migrate testPartitionMetadataFile to use
applyDelta in place of deprecated becomeLeaderOrFollower (#19947)
2a7457f2dd9 is described below
commit 2a7457f2dd95f0732562ae0708b5162e8c4a3a6d
Author: Jing-Jia Hung <[email protected]>
AuthorDate: Thu Jun 12 09:20:47 2025 +0800
KAFKA-18486 Migrate testPartitionMetadataFile to use applyDelta in place of
deprecated becomeLeaderOrFollower (#19947)
Refactor testPartitionMetadataFile to use applyDelta and share
class-level partitions
- Replace deprecated becomeLeaderOrFollower with topicsCreateDelta +
applyDelta
- Test still asserts partition exists, local log exists, and verifies
partitionMetadataFile version (0) and topicId
Reviewers: TaiJuWu <[email protected]>, Ken Huang <[email protected]>,
Chia-Ping Tsai <[email protected]>
---
.../unit/kafka/server/ReplicaManagerTest.scala | 29 ++++++----------------
1 file changed, 7 insertions(+), 22 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ee2115c635f..0499dd2f6f2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -114,6 +114,7 @@ class ReplicaManagerTest {
private val topicId = Uuid.fromString("YK2ed2GaTH2JpgzUaJ8tgg")
private val topicIds = scala.Predef.Map("test-topic" -> topicId)
private val topicNames = topicIds.map(_.swap)
+ private val topicPartition = new TopicPartition(topic, 0)
private val transactionalId = "txn"
private val time = new MockTime
private val metrics = new Metrics
@@ -4214,30 +4215,14 @@ class ReplicaManagerTest {
def testPartitionMetadataFile(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time))
try {
- val brokerList = Seq[Integer](0, 1).asJava
- val topicPartition = new TopicPartition(topic, 0)
- val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
- val topicNames = topicIds.asScala.map(_.swap).asJava
-
- def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String,
Uuid]): LeaderAndIsrRequest =
- new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(epoch)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(true)).asJava,
- topicIds,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+ val leaderDelta = topicsCreateDelta(0, isStartIdLeader = true,
partitions = List(0),
+ topicName = topic, topicId = topicIds(topic))
+ val leaderImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderImage)
- val response = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(0, topicIds), (_, _) => ())
- assertEquals(Errors.NONE,
response.partitionErrors(topicNames).get(topicPartition))
+
assertTrue(replicaManager.getPartition(topicPartition).isInstanceOf[HostedPartition.Online])
assertFalse(replicaManager.localLog(topicPartition).isEmpty)
- val id = topicIds.get(topicPartition.topic())
+ val id = topicIds(topicPartition.topic)
val log = replicaManager.localLog(topicPartition).get
assertTrue(log.partitionMetadataFile.get.exists())
val partitionMetadata = log.partitionMetadataFile.get.read()