This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new c5a51aba18f KAFKA-18486; [1/2] Update LocalLeaderEndPointTest (#18666)
c5a51aba18f is described below
commit c5a51aba18f8db9dfedbb627c8f5dd04fe669b90
Author: David Jacot <[email protected]>
AuthorDate: Thu Jan 23 10:49:16 2025 +0100
KAFKA-18486; [1/2] Update LocalLeaderEndPointTest (#18666)
This patch is a first step towards removing
`ReplicaManager#becomeLeaderOrFollower`. It updates the
`LocalLeaderEndPointTest` tests.
Reviewers: Christo Lolov <[email protected]>, Ismael Juma
<[email protected]>
---
.../kafka/server/LocalLeaderEndPointTest.scala | 126 ++++++++++++---------
1 file changed, 71 insertions(+), 55 deletions(-)
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 86ffdfb681c..6e9eeeb5cd7 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -20,38 +20,39 @@ package kafka.server
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.{Node, TopicPartition, Uuid}
+import org.apache.kafka.common.{TopicPartition, Uuid}
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.metadata.{PartitionChangeRecord,
PartitionRecord, TopicRecord}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
-import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.util.{MockScheduler, MockTime}
-import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
import org.apache.kafka.storage.internals.log.{AppendOrigin,
LogDirFailureChannel}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
import org.mockito.Mockito.mock
import java.io.File
-import java.util.Collections
-import scala.collection.{Map, Seq}
+import scala.collection.Map
import scala.jdk.CollectionConverters._
class LocalLeaderEndPointTest extends Logging {
val time = new MockTime
- val topicId: Uuid = Uuid.randomUuid()
+ val topicId = Uuid.randomUuid()
val topic = "test"
- val topicPartition = new TopicPartition(topic, 5)
+ val partition = 5
+ val topicPartition = new TopicPartition(topic, partition)
val sourceBroker: BrokerEndPoint = new BrokerEndPoint(0, "localhost", 9092)
var replicaManager: ReplicaManager = _
var endPoint: LeaderEndPoint = _
var quotaManager: QuotaManagers = _
+ var image: MetadataImage = _
@BeforeEach
def setUp(): Unit = {
@@ -70,16 +71,35 @@ class LocalLeaderEndPointTest extends Logging {
quotaManagers = quotaManager,
metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () =>
KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
- alterPartitionManager = alterPartitionManager)
- val partition = replicaManager.createPartition(topicPartition)
- partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
- new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
- // Make this replica the leader.
- val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 0)
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+ alterPartitionManager = alterPartitionManager
+ )
+
+ val delta = new MetadataDelta(MetadataImage.EMPTY)
+ delta.replay(new TopicRecord()
+ .setName(topic)
+ .setTopicId(topicId)
+ )
+ delta.replay(new PartitionRecord()
+ .setPartitionId(partition)
+ .setTopicId(topicId)
+ .setReplicas(List[Integer](sourceBroker.id).asJava)
+ .setIsr(List[Integer](sourceBroker.id).asJava)
+ .setLeader(sourceBroker.id)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(0)
+ )
+
+ image = delta.apply(MetadataProvenance.EMPTY)
+ replicaManager.applyDelta(delta.topicsDelta(), image)
+
replicaManager.getPartitionOrException(topicPartition)
.localLogOrException
- endPoint = new LocalLeaderEndPoint(sourceBroker, config, replicaManager,
QuotaFactory.UNBOUNDED_QUOTA)
+ endPoint = new LocalLeaderEndPoint(
+ sourceBroker,
+ config,
+ replicaManager,
+ QuotaFactory.UNBOUNDED_QUOTA
+ )
}
@AfterEach
@@ -93,11 +113,10 @@ class LocalLeaderEndPointTest extends Logging {
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
assertEquals(new OffsetAndEpoch(3L, 0),
endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 0))
- val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+ bumpLeaderEpoch()
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
- assertEquals(new OffsetAndEpoch(6L, 4),
endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7))
+ assertEquals(new OffsetAndEpoch(6L, 1),
endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7))
}
@Test
@@ -106,12 +125,11 @@ class LocalLeaderEndPointTest extends Logging {
.onFire(response => assertEquals(Errors.NONE, response.error))
assertEquals(new OffsetAndEpoch(0L, 0),
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 0))
- val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+ bumpLeaderEpoch()
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 3), _
=> ())
- assertEquals(new OffsetAndEpoch(3L, 4),
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
+ assertEquals(new OffsetAndEpoch(3L, 1),
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
}
@Test
@@ -120,13 +138,12 @@ class LocalLeaderEndPointTest extends Logging {
.onFire(response => assertEquals(Errors.NONE, response.error))
assertEquals(new OffsetAndEpoch(0L, 0),
endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 0))
- val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+ bumpLeaderEpoch()
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
replicaManager.logManager.getLog(topicPartition).foreach(log =>
log._localLogStartOffset = 3)
assertEquals(new OffsetAndEpoch(0L, 0),
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
- assertEquals(new OffsetAndEpoch(3L, 4),
endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7))
+ assertEquals(new OffsetAndEpoch(3L, 1),
endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7))
}
@Test
@@ -137,42 +154,49 @@ class LocalLeaderEndPointTest extends Logging {
var result = endPoint.fetchEpochEndOffsets(Map(
topicPartition -> new OffsetForLeaderPartition()
.setPartition(topicPartition.partition)
- .setLeaderEpoch(0)))
+ .setLeaderEpoch(0)
+ ))
var expected = Map(
topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code)
.setLeaderEpoch(0)
- .setEndOffset(3L))
+ .setEndOffset(3L)
+ )
assertEquals(expected, result)
// Change leader epoch and end offset, and verify the behavior again.
- val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+ bumpLeaderEpoch()
+ bumpLeaderEpoch()
+ assertEquals(2,
replicaManager.getPartitionOrException(topicPartition).getLeaderEpoch)
+
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))
result = endPoint.fetchEpochEndOffsets(Map(
topicPartition -> new OffsetForLeaderPartition()
.setPartition(topicPartition.partition)
- .setLeaderEpoch(4)))
+ .setLeaderEpoch(2)
+ ))
expected = Map(
topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code)
- .setLeaderEpoch(4)
- .setEndOffset(6L))
+ .setLeaderEpoch(2)
+ .setEndOffset(6L)
+ )
assertEquals(expected, result)
- // Check missing epoch: 3, we expect the API to return (leader_epoch=0,
end_offset=3).
+ // Check missing epoch: 1, we expect the API to return (leader_epoch=0,
end_offset=3).
result = endPoint.fetchEpochEndOffsets(Map(
topicPartition -> new OffsetForLeaderPartition()
.setPartition(topicPartition.partition)
- .setLeaderEpoch(3)))
+ .setLeaderEpoch(1)
+ ))
expected = Map(
topicPartition -> new EpochEndOffset()
@@ -187,14 +211,16 @@ class LocalLeaderEndPointTest extends Logging {
result = endPoint.fetchEpochEndOffsets(Map(
topicPartition -> new OffsetForLeaderPartition()
.setPartition(topicPartition.partition)
- .setLeaderEpoch(5)))
+ .setLeaderEpoch(5)
+ ))
expected = Map(
topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code)
.setLeaderEpoch(-1)
- .setEndOffset(-1L))
+ .setEndOffset(-1L)
+ )
assertEquals(expected, result)
}
@@ -219,22 +245,16 @@ class LocalLeaderEndPointTest extends Logging {
}
}
- private def buildLeaderAndIsrRequest(leaderEpoch: Int): LeaderAndIsrRequest
= {
- val brokerList = Seq[Integer](sourceBroker.id).asJava
- val topicIds = Collections.singletonMap(topic, topicId)
- new LeaderAndIsrRequest.Builder(0, 0, 0,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(topicPartition.partition())
- .setControllerEpoch(0)
- .setLeader(sourceBroker.id)
- .setLeaderEpoch(leaderEpoch)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(false)).asJava,
- topicIds,
- Set(node(sourceBroker)).asJava).build()
+ private def bumpLeaderEpoch(): Unit = {
+ val delta = new MetadataDelta(image)
+ delta.replay(new PartitionChangeRecord()
+ .setTopicId(topicId)
+ .setPartitionId(partition)
+ .setLeader(sourceBroker.id)
+ )
+
+ image = delta.apply(MetadataProvenance.EMPTY)
+ replicaManager.applyDelta(delta.topicsDelta, image)
}
private def appendRecords(replicaManager: ReplicaManager,
@@ -260,10 +280,6 @@ class LocalLeaderEndPointTest extends Logging {
result
}
- private def node(endPoint: BrokerEndPoint): Node = {
- new Node(endPoint.id, endPoint.host, endPoint.port)
- }
-
private def records: MemoryRecords = {
MemoryRecords.withRecords(Compression.NONE,
new SimpleRecord("first message".getBytes()),