satishd commented on code in PR #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r1047015681
##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -614,12 +613,103 @@ class AbstractFetcherThreadTest {
assertEquals(0L, replicaState.highWatermark)
}
+ @Test
+ def testFollowerFetchMovedToTieredStore(): Unit = {
+ val partition = new TopicPartition("topic", 0)
+ val fetcher = new MockFetcherThread(new MockLeaderEndPoint)
+
+ val replicaLog = Seq(
+ mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+ mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+ mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+ val replicaState = PartitionState(replicaLog, leaderEpoch = 5,
highWatermark = 0L, rlmEnabled = true)
+ fetcher.setReplicaState(partition, replicaState)
+ fetcher.addPartitions(Map(partition ->
initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
+
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 5, leaderEpoch = 5, new SimpleRecord("f".getBytes)),
+ mkBatch(baseOffset = 6, leaderEpoch = 5, new SimpleRecord("g".getBytes)),
+ mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
+ mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes)))
+
+
+ val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark
= 8L, rlmEnabled = true)
+ // Overriding the log start offset to zero for mocking the scenario of
segment 0-4 moved to remote store.
+ leaderState.logStartOffset = 0
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ assertEquals(3L, replicaState.logEndOffset)
+ val expectedState = if (truncateOnFetch) Option(Fetching) else
Option(Truncating)
Review Comment:
`truncateOnFetch` is the existing code. It is not added by this PR.
##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1103,18 +1195,24 @@ class AbstractFetcherThreadTest {
}.toMap
}
- override def fetchEarliestOffset(topicPartition: TopicPartition,
leaderEpoch: Int): Long = {
+ override def fetchEarliestOffset(topicPartition: TopicPartition,
leaderEpoch: Int): (Int, Long) = {
val leaderState = leaderPartitionState(topicPartition)
checkLeaderEpochAndThrow(leaderEpoch, leaderState)
- leaderState.logStartOffset
+ (leaderState.leaderEpoch, leaderState.logStartOffset)
}
- override def fetchLatestOffset(topicPartition: TopicPartition,
leaderEpoch: Int): Long = {
+ override def fetchLatestOffset(topicPartition: TopicPartition,
leaderEpoch: Int): (Int, Long) = {
val leaderState = leaderPartitionState(topicPartition)
checkLeaderEpochAndThrow(leaderEpoch, leaderState)
- leaderState.logEndOffset
+ (leaderState.leaderEpoch, leaderState.logEndOffset)
}
+ override def fetchEarliestLocalOffset(topicPartition: TopicPartition,
leaderEpoch: Int): (Int, Long) = {
+ val leaderState = leaderPartitionState(topicPartition)
+ checkLeaderEpochAndThrow(leaderEpoch, leaderState)
+ (leaderState.leaderEpoch, leaderState.localLogStartOffset)
+
+ }
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]