This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3a0a1705a1a KAFKA-18486 Remove becomeLeaderOrFollower from
readFromLogWithOffsetOutOfRange and other related methods. (#19929)
3a0a1705a1a is described below
commit 3a0a1705a1a7caa9b4b14158b05325138c904451
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Tue Jun 10 12:39:32 2025 +0800
KAFKA-18486 Remove becomeLeaderOrFollower from
readFromLogWithOffsetOutOfRange and other related methods. (#19929)
refactor out becomeLeaderOrFollower in below tests
- readFromLogWithOffsetOutOfRange
- testBecomeFollowerWhileNewClientFetchInPurgatory
- testBecomeFollowerWhileOldClientFetchInPurgatory
- testBuildRemoteLogAuxStateMetricsThrowsException
Reviewers: Chia-Ping Tsai <[email protected]>, Ken Huang
<[email protected]>, TengYao Chi <[email protected]>
---
.../unit/kafka/server/ReplicaManagerTest.scala | 136 ++++++---------------
1 file changed, 38 insertions(+), 98 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index de1441e5be9..f90b7fa9e55 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2017,22 +2017,10 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
- val partition0Replicas = Seq[Integer](0, 1).asJava
- val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(1)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true)).asJava,
- topicIds.asJava,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) =>
())
+ val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0)
+ val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L,
0L, 100,
Optional.empty())
@@ -2040,20 +2028,9 @@ class ReplicaManagerTest {
assertFalse(fetchResult.hasFired)
// Become a follower and ensure that the delayed fetch returns
immediately
- val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(0)
- .setLeader(1)
- .setLeaderEpoch(2)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true)).asJava,
- topicIds.asJava,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _)
=> ())
+ val followerDelta = createFollowerDelta(topicId, tp0, followerId = 0,
leaderId = 1, leaderEpoch = 2)
+ val followerMetadataImage = imageFromTopics(followerDelta.apply())
+ replicaManager.applyDelta(followerDelta, followerMetadataImage)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER,
fetchResult.assertFired.error)
} finally {
replicaManager.shutdown(checkpointHW = false)
@@ -2072,20 +2049,9 @@ class ReplicaManagerTest {
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
- val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(1)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true)).asJava,
- topicIds.asJava,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) =>
())
+ val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0,
leaderEpoch = 1, replicas = partition0Replicas, isr = partition0Replicas)
+ val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
val clientMetadata = new DefaultClientMetadata("", "", null,
KafkaPrincipal.ANONYMOUS, "")
val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L,
0L, 100,
@@ -2100,20 +2066,9 @@ class ReplicaManagerTest {
assertFalse(fetchResult.hasFired)
// Become a follower and ensure that the delayed fetch returns
immediately
- val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(0)
- .setLeader(1)
- .setLeaderEpoch(2)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true)).asJava,
- topicIds.asJava,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _)
=> ())
+ val followerDelta = createFollowerDelta(topicId, tp0, followerId = 0,
leaderId = 1, leaderEpoch = 2)
+ val followerMetadataImage = imageFromTopics(followerDelta.apply())
+ replicaManager.applyDelta(followerDelta, followerMetadataImage)
assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.assertFired.error)
} finally {
replicaManager.shutdown(checkpointHW = false)
@@ -4215,22 +4170,6 @@ class ReplicaManagerTest {
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
- val topicIds = Map(tp0.topic -> topicId).asJava
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(0)
- .setLeader(1)
- .setLeaderEpoch(0)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true)
- ).asJava,
- topicIds,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
// Verify the metrics for build remote log state and for failures is
zero before replicas start to fetch
assertEquals(0,
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
@@ -4239,7 +4178,9 @@ class ReplicaManagerTest {
assertEquals(0,
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
assertEquals(0,
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) =>
())
+ val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas
= partition0Replicas, isr = partition0Replicas)
+ val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
// Replicas fetch from the leader periodically, therefore we check that
the metric value is increasing
// We expect failedBuildRemoteLogAuxStateRate to increase because
fetchRemoteLogSegmentMetadata returns RemoteStorageException
@@ -4558,9 +4499,17 @@ class ReplicaManagerTest {
}
}
- private def createLeaderDelta(topicId: Uuid, partition: TopicPartition,
leaderId: Int): TopicsDelta = {
+ private def createLeaderDelta(
+ topicId: Uuid,
+ partition: TopicPartition,
+ leaderId: Integer,
+ replicas: util.List[Integer] = null,
+ isr: util.List[Integer] = null,
+ leaderEpoch: Int = 0): TopicsDelta = {
val delta = new TopicsDelta(TopicsImage.EMPTY)
-
+ val effectiveReplicas =
Option(replicas).getOrElse(java.util.List.of(leaderId))
+ val effectiveIsr = Option(isr).getOrElse(java.util.List.of(leaderId))
+
delta.replay(new TopicRecord()
.setName(partition.topic)
.setTopicId(topicId)
@@ -4569,19 +4518,24 @@ class ReplicaManagerTest {
delta.replay(new PartitionRecord()
.setPartitionId(partition.partition)
.setTopicId(topicId)
- .setReplicas(util.Arrays.asList(leaderId))
- .setIsr(util.Arrays.asList(leaderId))
+ .setReplicas(effectiveReplicas)
+ .setIsr(effectiveIsr)
.setRemovingReplicas(Collections.emptyList())
.setAddingReplicas(Collections.emptyList())
.setLeader(leaderId)
- .setLeaderEpoch(0)
+ .setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0)
)
delta
}
- private def createFollowerDelta(topicId: Uuid, partition: TopicPartition,
followerId: Int, leaderId: Int): TopicsDelta = {
+ private def createFollowerDelta(
+ topicId: Uuid,
+ partition: TopicPartition,
+ followerId: Int,
+ leaderId: Int,
+ leaderEpoch: Int = 0): TopicsDelta = {
val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord()
@@ -4597,7 +4551,7 @@ class ReplicaManagerTest {
.setRemovingReplicas(Collections.emptyList())
.setAddingReplicas(Collections.emptyList())
.setLeader(leaderId)
- .setLeaderEpoch(0)
+ .setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0)
)
@@ -6352,24 +6306,10 @@ class ReplicaManagerTest {
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
- val topicIds = Map(tp.topic -> topicId).asJava
val leaderEpoch = 0
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp.topic)
- .setPartitionIndex(tp.partition)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(0)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true)
- ).asJava,
- topicIds,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) =>
())
+ val leaderDelta = createLeaderDelta(topicId, tp, leaderId = 0,
leaderEpoch = leaderEpoch, replicas = partition0Replicas, isr =
partition0Replicas)
+ val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
val params = new FetchParams(-1, 1, 1000, 0, 100,
FetchIsolation.HIGH_WATERMARK, Optional.empty)
replicaManager.readFromLog(