This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3a0a1705a1a KAFKA-18486 Remove becomeLeaderOrFollower from 
readFromLogWithOffsetOutOfRange and other related methods. (#19929)
3a0a1705a1a is described below

commit 3a0a1705a1a7caa9b4b14158b05325138c904451
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Tue Jun 10 12:39:32 2025 +0800

    KAFKA-18486 Remove becomeLeaderOrFollower from 
readFromLogWithOffsetOutOfRange and other related methods. (#19929)
    
    refactor out becomeLeaderOrFollower in below tests
    - readFromLogWithOffsetOutOfRange
    - testBecomeFollowerWhileNewClientFetchInPurgatory
    - testBecomeFollowerWhileOldClientFetchInPurgatory
    - testBuildRemoteLogAuxStateMetricsThrowsException
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Ken Huang
     <[email protected]>, TengYao Chi <[email protected]>
---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 136 ++++++---------------
 1 file changed, 38 insertions(+), 98 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index de1441e5be9..f90b7fa9e55 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2017,22 +2017,10 @@ class ReplicaManagerTest {
       val tidp0 = new TopicIdPartition(topicId, tp0)
       val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
       replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
-      val partition0Replicas = Seq[Integer](0, 1).asJava
 
-      val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(tp0.topic)
-          .setPartitionIndex(tp0.partition)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(1)
-          .setIsr(partition0Replicas)
-          .setPartitionEpoch(0)
-          .setReplicas(partition0Replicas)
-          .setIsNew(true)).asJava,
-        topicIds.asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => 
())
+      val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0)
+      val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
 
       val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 
0L, 100,
         Optional.empty())
@@ -2040,20 +2028,9 @@ class ReplicaManagerTest {
       assertFalse(fetchResult.hasFired)
 
       // Become a follower and ensure that the delayed fetch returns 
immediately
-      val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(tp0.topic)
-          .setPartitionIndex(tp0.partition)
-          .setControllerEpoch(0)
-          .setLeader(1)
-          .setLeaderEpoch(2)
-          .setIsr(partition0Replicas)
-          .setPartitionEpoch(0)
-          .setReplicas(partition0Replicas)
-          .setIsNew(true)).asJava,
-        topicIds.asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) 
=> ())
+      val followerDelta = createFollowerDelta(topicId, tp0, followerId = 0, 
leaderId = 1, leaderEpoch = 2)
+      val followerMetadataImage = imageFromTopics(followerDelta.apply())
+      replicaManager.applyDelta(followerDelta, followerMetadataImage)
       assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, 
fetchResult.assertFired.error)
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -2072,20 +2049,9 @@ class ReplicaManagerTest {
       replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
       val partition0Replicas = Seq[Integer](0, 1).asJava
 
-      val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(tp0.topic)
-          .setPartitionIndex(tp0.partition)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(1)
-          .setIsr(partition0Replicas)
-          .setPartitionEpoch(0)
-          .setReplicas(partition0Replicas)
-          .setIsNew(true)).asJava,
-        topicIds.asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => 
())
+      val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0, 
leaderEpoch = 1, replicas = partition0Replicas, isr = partition0Replicas)
+      val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
 
       val clientMetadata = new DefaultClientMetadata("", "", null, 
KafkaPrincipal.ANONYMOUS, "")
       val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 
0L, 100,
@@ -2100,20 +2066,9 @@ class ReplicaManagerTest {
       assertFalse(fetchResult.hasFired)
 
       // Become a follower and ensure that the delayed fetch returns 
immediately
-      val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(tp0.topic)
-          .setPartitionIndex(tp0.partition)
-          .setControllerEpoch(0)
-          .setLeader(1)
-          .setLeaderEpoch(2)
-          .setIsr(partition0Replicas)
-          .setPartitionEpoch(0)
-          .setReplicas(partition0Replicas)
-          .setIsNew(true)).asJava,
-        topicIds.asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) 
=> ())
+      val followerDelta = createFollowerDelta(topicId, tp0, followerId = 0, 
leaderId = 1, leaderEpoch = 2)
+      val followerMetadataImage = imageFromTopics(followerDelta.apply())
+      replicaManager.applyDelta(followerDelta, followerMetadataImage)
       assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.assertFired.error)
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -4215,22 +4170,6 @@ class ReplicaManagerTest {
       val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
       replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
       val partition0Replicas = Seq[Integer](0, 1).asJava
-      val topicIds = Map(tp0.topic -> topicId).asJava
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp0.topic)
-            .setPartitionIndex(tp0.partition)
-            .setControllerEpoch(0)
-            .setLeader(1)
-            .setLeaderEpoch(0)
-            .setIsr(partition0Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition0Replicas)
-            .setIsNew(true)
-        ).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
 
       // Verify the metrics for build remote log state and for failures is 
zero before replicas start to fetch
       assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
@@ -4239,7 +4178,9 @@ class ReplicaManagerTest {
       assertEquals(0, 
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
       assertEquals(0, 
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
 
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+      val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas 
= partition0Replicas, isr = partition0Replicas)
+      val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
 
       // Replicas fetch from the leader periodically, therefore we check that 
the metric value is increasing
       // We expect failedBuildRemoteLogAuxStateRate to increase because 
fetchRemoteLogSegmentMetadata returns RemoteStorageException
@@ -4558,9 +4499,17 @@ class ReplicaManagerTest {
     }
   }
 
-  private def createLeaderDelta(topicId: Uuid, partition: TopicPartition, 
leaderId: Int): TopicsDelta = {
+  private def createLeaderDelta(
+    topicId: Uuid,
+    partition: TopicPartition,
+    leaderId: Integer,
+    replicas: util.List[Integer] = null,
+    isr: util.List[Integer] = null,
+    leaderEpoch: Int = 0): TopicsDelta = {
     val delta = new TopicsDelta(TopicsImage.EMPTY)
-    
+    val effectiveReplicas = 
Option(replicas).getOrElse(java.util.List.of(leaderId))
+    val effectiveIsr = Option(isr).getOrElse(java.util.List.of(leaderId))
+
     delta.replay(new TopicRecord()
       .setName(partition.topic)
       .setTopicId(topicId)
@@ -4569,19 +4518,24 @@ class ReplicaManagerTest {
     delta.replay(new PartitionRecord()
       .setPartitionId(partition.partition)
       .setTopicId(topicId)
-      .setReplicas(util.Arrays.asList(leaderId))
-      .setIsr(util.Arrays.asList(leaderId))
+      .setReplicas(effectiveReplicas)
+      .setIsr(effectiveIsr)
       .setRemovingReplicas(Collections.emptyList())
       .setAddingReplicas(Collections.emptyList())
       .setLeader(leaderId)
-      .setLeaderEpoch(0)
+      .setLeaderEpoch(leaderEpoch)
       .setPartitionEpoch(0)
     )
 
     delta
   }
 
-  private def createFollowerDelta(topicId: Uuid, partition: TopicPartition, 
followerId: Int, leaderId: Int): TopicsDelta = {
+  private def createFollowerDelta(
+    topicId: Uuid,
+    partition: TopicPartition,
+    followerId: Int,
+    leaderId: Int,
+    leaderEpoch: Int = 0): TopicsDelta = {
     val delta = new TopicsDelta(TopicsImage.EMPTY)
 
     delta.replay(new TopicRecord()
@@ -4597,7 +4551,7 @@ class ReplicaManagerTest {
       .setRemovingReplicas(Collections.emptyList())
       .setAddingReplicas(Collections.emptyList())
       .setLeader(leaderId)
-      .setLeaderEpoch(0)
+      .setLeaderEpoch(leaderEpoch)
       .setPartitionEpoch(0)
     )
 
@@ -6352,24 +6306,10 @@ class ReplicaManagerTest {
       val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
       replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None)
       val partition0Replicas = Seq[Integer](0, 1).asJava
-      val topicIds = Map(tp.topic -> topicId).asJava
       val leaderEpoch = 0
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp.topic)
-            .setPartitionIndex(tp.partition)
-            .setControllerEpoch(0)
-            .setLeader(0)
-            .setLeaderEpoch(0)
-            .setIsr(partition0Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition0Replicas)
-            .setIsNew(true)
-        ).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+      val leaderDelta = createLeaderDelta(topicId, tp, leaderId = 0, 
leaderEpoch = leaderEpoch, replicas = partition0Replicas, isr = 
partition0Replicas)
+      val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
 
       val params = new FetchParams(-1, 1, 1000, 0, 100, 
FetchIsolation.HIGH_WATERMARK, Optional.empty)
       replicaManager.readFromLog(

Reply via email to