This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new c5a51aba18f KAFKA-18486; [1/2] Update LocalLeaderEndPointTest (#18666)
c5a51aba18f is described below

commit c5a51aba18f8db9dfedbb627c8f5dd04fe669b90
Author: David Jacot <[email protected]>
AuthorDate: Thu Jan 23 10:49:16 2025 +0100

    KAFKA-18486; [1/2] Update LocalLeaderEndPointTest (#18666)
    
    This patch is a first step towards removing 
`ReplicaManager#becomeLeaderOrFollower`. It updates the 
`LocalLeaderEndPointTest` tests.
    
    Reviewers: Christo Lolov <[email protected]>, Ismael Juma 
<[email protected]>
---
 .../kafka/server/LocalLeaderEndPointTest.scala     | 126 ++++++++++++---------
 1 file changed, 71 insertions(+), 55 deletions(-)

diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 86ffdfb681c..6e9eeeb5cd7 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -20,38 +20,39 @@ package kafka.server
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.{CoreUtils, Logging, TestUtils}
 import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.{Node, TopicPartition, Uuid}
+import org.apache.kafka.common.{TopicPartition, Uuid}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.metadata.{PartitionChangeRecord, 
PartitionRecord, TopicRecord}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
-import org.apache.kafka.common.requests.LeaderAndIsrRequest
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
 import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch}
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.util.{MockScheduler, MockTime}
-import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
 import org.apache.kafka.storage.internals.log.{AppendOrigin, 
LogDirFailureChannel}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.api.Assertions._
 import org.mockito.Mockito.mock
 
 import java.io.File
-import java.util.Collections
-import scala.collection.{Map, Seq}
+import scala.collection.Map
 import scala.jdk.CollectionConverters._
 
 class LocalLeaderEndPointTest extends Logging {
 
   val time = new MockTime
-  val topicId: Uuid = Uuid.randomUuid()
+  val topicId = Uuid.randomUuid()
   val topic = "test"
-  val topicPartition = new TopicPartition(topic, 5)
+  val partition = 5
+  val topicPartition = new TopicPartition(topic, partition)
   val sourceBroker: BrokerEndPoint = new BrokerEndPoint(0, "localhost", 9092)
   var replicaManager: ReplicaManager = _
   var endPoint: LeaderEndPoint = _
   var quotaManager: QuotaManagers = _
+  var image: MetadataImage = _
 
   @BeforeEach
   def setUp(): Unit = {
@@ -70,16 +71,35 @@ class LocalLeaderEndPointTest extends Logging {
       quotaManagers = quotaManager,
       metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterPartitionManager = alterPartitionManager)
-    val partition = replicaManager.createPartition(topicPartition)
-    partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
-      new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
-    // Make this replica the leader.
-    val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 0)
-    replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+      alterPartitionManager = alterPartitionManager
+    )
+
+    val delta = new MetadataDelta(MetadataImage.EMPTY)
+    delta.replay(new TopicRecord()
+      .setName(topic)
+      .setTopicId(topicId)
+    )
+    delta.replay(new PartitionRecord()
+      .setPartitionId(partition)
+      .setTopicId(topicId)
+      .setReplicas(List[Integer](sourceBroker.id).asJava)
+      .setIsr(List[Integer](sourceBroker.id).asJava)
+      .setLeader(sourceBroker.id)
+      .setLeaderEpoch(0)
+      .setPartitionEpoch(0)
+    )
+
+    image = delta.apply(MetadataProvenance.EMPTY)
+    replicaManager.applyDelta(delta.topicsDelta(), image)
+
     replicaManager.getPartitionOrException(topicPartition)
       .localLogOrException
-    endPoint = new LocalLeaderEndPoint(sourceBroker, config, replicaManager, 
QuotaFactory.UNBOUNDED_QUOTA)
+    endPoint = new LocalLeaderEndPoint(
+      sourceBroker,
+      config,
+      replicaManager,
+      QuotaFactory.UNBOUNDED_QUOTA
+    )
   }
 
   @AfterEach
@@ -93,11 +113,10 @@ class LocalLeaderEndPointTest extends Logging {
     appendRecords(replicaManager, topicPartition, records)
       .onFire(response => assertEquals(Errors.NONE, response.error))
     assertEquals(new OffsetAndEpoch(3L, 0), 
endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 0))
-    val leaderAndIsrRequest =  buildLeaderAndIsrRequest(leaderEpoch = 4)
-    replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+    bumpLeaderEpoch()
     appendRecords(replicaManager, topicPartition, records)
       .onFire(response => assertEquals(Errors.NONE, response.error))
-    assertEquals(new OffsetAndEpoch(6L, 4), 
endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7))
+    assertEquals(new OffsetAndEpoch(6L, 1), 
endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7))
   }
 
   @Test
@@ -106,12 +125,11 @@ class LocalLeaderEndPointTest extends Logging {
       .onFire(response => assertEquals(Errors.NONE, response.error))
     assertEquals(new OffsetAndEpoch(0L, 0), 
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 0))
 
-    val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
-    replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+    bumpLeaderEpoch()
     appendRecords(replicaManager, topicPartition, records)
       .onFire(response => assertEquals(Errors.NONE, response.error))
     replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 3), _ 
=> ())
-    assertEquals(new OffsetAndEpoch(3L, 4), 
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
+    assertEquals(new OffsetAndEpoch(3L, 1), 
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
   }
 
   @Test
@@ -120,13 +138,12 @@ class LocalLeaderEndPointTest extends Logging {
       .onFire(response => assertEquals(Errors.NONE, response.error))
     assertEquals(new OffsetAndEpoch(0L, 0), 
endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 0))
 
-    val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
-    replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+    bumpLeaderEpoch()
     appendRecords(replicaManager, topicPartition, records)
       .onFire(response => assertEquals(Errors.NONE, response.error))
     replicaManager.logManager.getLog(topicPartition).foreach(log => 
log._localLogStartOffset = 3)
     assertEquals(new OffsetAndEpoch(0L, 0), 
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
-    assertEquals(new OffsetAndEpoch(3L, 4), 
endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7))
+    assertEquals(new OffsetAndEpoch(3L, 1), 
endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7))
   }
 
   @Test
@@ -137,42 +154,49 @@ class LocalLeaderEndPointTest extends Logging {
     var result = endPoint.fetchEpochEndOffsets(Map(
       topicPartition -> new OffsetForLeaderPartition()
         .setPartition(topicPartition.partition)
-        .setLeaderEpoch(0)))
+        .setLeaderEpoch(0)
+    ))
 
     var expected = Map(
       topicPartition -> new EpochEndOffset()
         .setPartition(topicPartition.partition)
         .setErrorCode(Errors.NONE.code)
         .setLeaderEpoch(0)
-        .setEndOffset(3L))
+        .setEndOffset(3L)
+    )
 
     assertEquals(expected, result)
 
     // Change leader epoch and end offset, and verify the behavior again.
-    val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
-    replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+    bumpLeaderEpoch()
+    bumpLeaderEpoch()
+    assertEquals(2, 
replicaManager.getPartitionOrException(topicPartition).getLeaderEpoch)
+
     appendRecords(replicaManager, topicPartition, records)
       .onFire(response => assertEquals(Errors.NONE, response.error))
 
     result = endPoint.fetchEpochEndOffsets(Map(
       topicPartition -> new OffsetForLeaderPartition()
         .setPartition(topicPartition.partition)
-        .setLeaderEpoch(4)))
+        .setLeaderEpoch(2)
+    ))
 
     expected = Map(
       topicPartition -> new EpochEndOffset()
         .setPartition(topicPartition.partition)
         .setErrorCode(Errors.NONE.code)
-        .setLeaderEpoch(4)
-        .setEndOffset(6L))
+        .setLeaderEpoch(2)
+        .setEndOffset(6L)
+    )
 
     assertEquals(expected, result)
 
-    // Check missing epoch: 3, we expect the API to return (leader_epoch=0, 
end_offset=3).
+    // Check missing epoch: 1, we expect the API to return (leader_epoch=0, 
end_offset=3).
     result = endPoint.fetchEpochEndOffsets(Map(
       topicPartition -> new OffsetForLeaderPartition()
         .setPartition(topicPartition.partition)
-        .setLeaderEpoch(3)))
+        .setLeaderEpoch(1)
+    ))
 
     expected = Map(
       topicPartition -> new EpochEndOffset()
@@ -187,14 +211,16 @@ class LocalLeaderEndPointTest extends Logging {
     result = endPoint.fetchEpochEndOffsets(Map(
       topicPartition -> new OffsetForLeaderPartition()
         .setPartition(topicPartition.partition)
-        .setLeaderEpoch(5)))
+        .setLeaderEpoch(5)
+    ))
 
     expected = Map(
       topicPartition -> new EpochEndOffset()
         .setPartition(topicPartition.partition)
         .setErrorCode(Errors.NONE.code)
         .setLeaderEpoch(-1)
-        .setEndOffset(-1L))
+        .setEndOffset(-1L)
+    )
 
     assertEquals(expected, result)
   }
@@ -219,22 +245,16 @@ class LocalLeaderEndPointTest extends Logging {
     }
   }
 
-  private def buildLeaderAndIsrRequest(leaderEpoch: Int): LeaderAndIsrRequest 
= {
-    val brokerList = Seq[Integer](sourceBroker.id).asJava
-    val topicIds = Collections.singletonMap(topic, topicId)
-    new LeaderAndIsrRequest.Builder(0, 0, 0,
-      Seq(new LeaderAndIsrRequest.PartitionState()
-        .setTopicName(topic)
-        .setPartitionIndex(topicPartition.partition())
-        .setControllerEpoch(0)
-        .setLeader(sourceBroker.id)
-        .setLeaderEpoch(leaderEpoch)
-        .setIsr(brokerList)
-        .setPartitionEpoch(0)
-        .setReplicas(brokerList)
-        .setIsNew(false)).asJava,
-      topicIds,
-      Set(node(sourceBroker)).asJava).build()
+  private def bumpLeaderEpoch(): Unit = {
+    val delta = new MetadataDelta(image)
+    delta.replay(new PartitionChangeRecord()
+      .setTopicId(topicId)
+      .setPartitionId(partition)
+      .setLeader(sourceBroker.id)
+    )
+
+    image = delta.apply(MetadataProvenance.EMPTY)
+    replicaManager.applyDelta(delta.topicsDelta, image)
   }
 
   private def appendRecords(replicaManager: ReplicaManager,
@@ -260,10 +280,6 @@ class LocalLeaderEndPointTest extends Logging {
     result
   }
 
-  private def node(endPoint: BrokerEndPoint): Node = {
-    new Node(endPoint.id, endPoint.host, endPoint.port)
-  }
-
   private def records: MemoryRecords = {
     MemoryRecords.withRecords(Compression.NONE,
       new SimpleRecord("first message".getBytes()),

Reply via email to