divijvaidya commented on code in PR #14212:
URL: https://github.com/apache/kafka/pull/14212#discussion_r1294964226


##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala:
##########
@@ -85,6 +85,64 @@ class ReplicaFetcherTierStateMachineTest {
     assertEquals(9L, replicaState.logEndOffset)
   }
 
+  @Test
+  def testFollowerFetchMovedToAndDeletedFromTieredStore(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = PartitionState(replicaLog, leaderEpoch = 7, 
highWatermark = 0L, rlmEnabled = true)
+
+    val mockLeaderEndpoint = new MockLeaderEndPoint
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndpoint, 
mockTierStateMachine)
+
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> 
initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 7)))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 7, leaderEpoch = 7, new SimpleRecord("h".getBytes)),
+      mkBatch(baseOffset = 8, leaderEpoch = 7, new SimpleRecord("i".getBytes)),
+      mkBatch(baseOffset = 9, leaderEpoch = 7, new SimpleRecord("j".getBytes)),
+      mkBatch(baseOffset = 10, leaderEpoch = 7, new 
SimpleRecord("k".getBytes)))
+
+    val leaderState = PartitionState(leaderLog, leaderEpoch = 7, highWatermark 
= 10L, rlmEnabled = true)
+    // Overriding the log start offset to 5 for mocking the scenario of 
segments 5-6 moved to remote store and
+    // segments 0-4 expired.
+    leaderState.logStartOffset = 5
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    assertEquals(3L, replicaState.logEndOffset)
+    val expectedState = if (truncateOnFetch) Option(Fetching) else 
Option(Truncating)
+    assertEquals(expectedState, fetcher.fetchState(partition).map(_.state))
+
+    fetcher.doWork()
+    // Verify that the out of range error is triggered and the fetch offset is 
reset to the global log start offset.
+    assertEquals(0L, replicaState.logStartOffset)

Review Comment:
   Isn't this incorrect? logStartOffset should be 5 here after the first fetch 
call to the leader. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to