satishd commented on code in PR #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r1032871126


##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -614,12 +613,103 @@ class AbstractFetcherThreadTest {
     assertEquals(0L, replicaState.highWatermark)
   }
 
+  @Test
+  def testFollowerFetchMovedToTieredStore(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+    val fetcher = new MockFetcherThread(new MockLeaderEndPoint)
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = PartitionState(replicaLog, leaderEpoch = 5, 
highWatermark = 0L, rlmEnabled = true)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> 
initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 5, leaderEpoch = 5, new SimpleRecord("f".getBytes)),
+      mkBatch(baseOffset = 6, leaderEpoch = 5, new SimpleRecord("g".getBytes)),
+      mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
+      mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes)))
+
+
+    val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark 
= 8L, rlmEnabled = true)
+    // Overriding the log start offset to zero to mock the segment 0-4 moved 
to remote store.

Review Comment:
   Changed the text to make it more clear.



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -614,12 +613,103 @@ class AbstractFetcherThreadTest {
     assertEquals(0L, replicaState.highWatermark)
   }
 
+  @Test
+  def testFollowerFetchMovedToTieredStore(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+    val fetcher = new MockFetcherThread(new MockLeaderEndPoint)
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = PartitionState(replicaLog, leaderEpoch = 5, 
highWatermark = 0L, rlmEnabled = true)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> 
initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 5, leaderEpoch = 5, new SimpleRecord("f".getBytes)),
+      mkBatch(baseOffset = 6, leaderEpoch = 5, new SimpleRecord("g".getBytes)),
+      mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
+      mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes)))
+
+
+    val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark 
= 8L, rlmEnabled = true)
+    // Overriding the log start offset to zero to mock the segment 0-4 moved 
to remote store.
+    leaderState.logStartOffset = 0
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    assertEquals(3L, replicaState.logEndOffset)
+    val expectedState = if (truncateOnFetch) Option(Fetching) else 
Option(Truncating)
+    assertEquals(expectedState, fetcher.fetchState(partition).map(_.state))
+
+    fetcher.doWork()
+    // verify that the offset moved to tiered store error triggered and 
respective states are truncated to expected.
+    assertEquals(0L, replicaState.logStartOffset)
+    assertEquals(5L, replicaState.localLogStartOffset)
+    assertEquals(5L, replicaState.highWatermark)
+    assertEquals(5L, replicaState.logEndOffset)
+
+    // Only 1 record batch is returned after a poll so calling 'n' number of 
times to get the desired result.
+    for (_ <- 1 to 5) fetcher.doWork()
+    assertEquals(4, replicaState.log.size)
+    assertEquals(0L, replicaState.logStartOffset)
+    assertEquals(5L, replicaState.localLogStartOffset)
+    assertEquals(8L, replicaState.highWatermark)
+    assertEquals(9L, replicaState.logEndOffset)
+  }
+
+  @Test
+  def testFencedOffsetResetAfterMovedToRemoteTier(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+    var isErrorHandled = false
+    val fetcher = new MockFetcherThread(new MockLeaderEndPoint) {
+      override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                    currentLeaderEpoch: Int,
+                                                    fetchOffset: Long,
+                                                    epochForFetchOffset: Int,
+                                                    leaderLogStartOffset: 
Long): Long = {
+        isErrorHandled = true
+        throw new FencedLeaderEpochException(s"Epoch $currentLeaderEpoch is 
fenced")
+      }
+    }
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+    val replicaState = PartitionState(replicaLog, leaderEpoch = 5, 
highWatermark = 2L, rlmEnabled = true)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> 
initialFetchState(topicIds.get(partition.topic), fetchOffset = 0L, leaderEpoch 
= 5)))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 5, leaderEpoch = 5, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 6, leaderEpoch = 5, new SimpleRecord("c".getBytes)))
+    val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark 
= 6L, rlmEnabled = true)
+    // Overriding the log start offset to zero to mock the segment 0-4 moved 
to remote store.

Review Comment:
   Changed the text to make it more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to