This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4d0c588539e KAFKA-17302: ReplicaFetcher changes for fetching from 
tiered offset (#20428)
4d0c588539e is described below

commit 4d0c588539eb3e84ba6c051837c3f03069fae3f1
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Tue Feb 17 20:14:23 2026 +0530

    KAFKA-17302: ReplicaFetcher changes for fetching from tiered offset (#20428)
    
    In this PR, we are adding the ReplicaFetcher changes to support fetching
    from the last tiered offset.
    
    - We are modifying the TierStateMachine's start method signature to take
    any offset on the leader's local log from which to start replicating
    leader's logs. Previously, the start method used to build remote log
    auxiliary state until the leader's local start offset. Going forward, it
    will build the auxiliary state until the supplied offset.
    - When the ReplicaFetcher runs into an OOR or OMTS error, and it should
    fetch from last tiered offset, it will determine the offset until which
    remote log aux state should be built (last tiered offset) and use it to
    start the TierStateMachine.
    
    Test scenarios to cover:
    
    
https://docs.google.com/spreadsheets/d/1SPp4_otQP_4VH_F6Ta5rVhLPQhUnRxXnZu3hDWf07B4/edit?gid=1256210467#gid=1256210467
    
    Reviewers: Kamal Chandraprakash <[email protected]>
---
 .../main/java/kafka/server/TierStateMachine.java   |   32 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |   41 +-
 .../scala/kafka/server/LocalLeaderEndPoint.scala   |    4 +-
 .../kafka/server/LocalLeaderEndPointTest.scala     |  105 +
 .../kafka/server/AbstractFetcherManagerTest.scala  |    3 +-
 .../kafka/server/AbstractFetcherThreadTest.scala   | 2110 ++++++++++++++++++++
 .../unit/kafka/server/MockLeaderEndPoint.scala     |    4 +-
 .../unit/kafka/server/MockTierStateMachine.scala   |   20 +-
 .../unit/kafka/server/TierStateMachineTest.scala   |   11 +-
 9 files changed, 2293 insertions(+), 37 deletions(-)

diff --git a/core/src/main/java/kafka/server/TierStateMachine.java 
b/core/src/main/java/kafka/server/TierStateMachine.java
index 9d8dcafd203..05b7fc13712 100644
--- a/core/src/main/java/kafka/server/TierStateMachine.java
+++ b/core/src/main/java/kafka/server/TierStateMachine.java
@@ -21,7 +21,7 @@ import kafka.cluster.Partition;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.message.FetchResponseData.PartitionData;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
 import org.apache.kafka.common.protocol.Errors;
@@ -85,21 +85,19 @@ public class TierStateMachine {
     /**
      * Start the tier state machine for the provided topic partition.
      *
-     * @param topicPartition the topic partition
-     * @param currentFetchState the current PartitionFetchState which will
-     *                          be used to derive the return value
-     * @param fetchPartitionData the data from the fetch response that 
returned the offset moved to tiered storage error
-     *
-     * @return the new PartitionFetchState after the successful start of the
-     *         tier state machine
+     * @param topicPartition            the topic partition for which the tier 
state machine is to be started
+     * @param topicId                   the optional unique identifier of the 
topic
+     * @param currentLeaderEpoch        the current leader epoch of the 
partition
+     * @param fetchStartOffsetAndEpoch  the offset on the leader's local log 
from which to start replicating logs
+     * @param leaderLogStartOffset      the starting offset in the leader's log
+     * @return the new PartitionFetchState after the successful start of the 
tier state machine
+     * @throws Exception if an error occurs during the process, such as issues 
with remote storage
      */
     PartitionFetchState start(TopicPartition topicPartition,
-                              PartitionFetchState currentFetchState,
-                              PartitionData fetchPartitionData) throws 
Exception {
-        OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
-        int epoch = epochAndLeaderLocalStartOffset.epoch();
-        long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
-
+                              Optional<Uuid> topicId,
+                              int currentLeaderEpoch,
+                              OffsetAndEpoch fetchStartOffsetAndEpoch,
+                              long leaderLogStartOffset) throws Exception {
         long offsetToFetch;
         
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
         
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
@@ -112,19 +110,19 @@ public class TierStateMachine {
         }
 
         try {
-            offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset(), unifiedLog);
+            offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentLeaderEpoch, fetchStartOffsetAndEpoch.offset(), 
fetchStartOffsetAndEpoch.epoch(), leaderLogStartOffset, unifiedLog);
         } catch (RemoteStorageException e) {
             
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
             
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
             throw e;
         }
 
-        OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+        OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, currentLeaderEpoch);
         long leaderEndOffset = fetchLatestOffsetResult.offset();
 
         long initialLag = leaderEndOffset - offsetToFetch;
 
-        return new PartitionFetchState(currentFetchState.topicId(), 
offsetToFetch, Optional.of(initialLag), currentFetchState.currentLeaderEpoch(),
+        return new PartitionFetchState(topicId, offsetToFetch, 
Optional.of(initialLag), currentLeaderEpoch,
                 ReplicaState.FETCHING, unifiedLog.latestEpoch());
 
     }
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 97ae76c4f5c..107e12a695f 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -671,7 +671,13 @@ abstract class AbstractFetcherThread(name: String,
      */
     val offsetAndEpoch = leader.fetchLatestOffset(topicPartition, 
currentLeaderEpoch)
     val leaderEndOffset = offsetAndEpoch.offset
-    if (leaderEndOffset < replicaEndOffset) {
+    val fetchFromLastTieredOffset = 
shouldFetchFromLastTieredOffset(topicPartition, leaderEndOffset, 
replicaEndOffset)
+
+    if (fetchFromLastTieredOffset) {
+      val leaderStartOffsetAndEpoch = 
leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
+      val earliestPendingUploadOffsetAndEpoch = 
fetchEarliestPendingUploadOffset(topicPartition, currentLeaderEpoch, 
leaderStartOffsetAndEpoch)
+      fetchTierStateMachine.start(topicPartition, topicId.toJava, 
currentLeaderEpoch, earliestPendingUploadOffsetAndEpoch, 
leaderStartOffsetAndEpoch.offset())
+    } else if (leaderEndOffset < replicaEndOffset) {
       warn(s"Reset fetch offset for partition $topicPartition from 
$replicaEndOffset to current " +
         s"leader's latest offset $leaderEndOffset")
       truncate(topicPartition, OffsetTruncationState(leaderEndOffset, 
truncationCompleted = true))
@@ -781,7 +787,15 @@ abstract class AbstractFetcherThread(name: String,
                                                 leaderEpochInRequest: 
Optional[Integer],
                                                 fetchPartitionData: 
PartitionData): Boolean = {
     try {
-      val newFetchState = fetchTierStateMachine.start(topicPartition, 
fetchState, fetchPartitionData)
+      val isLastTieredOffsetFetchEnabled = 
shouldFetchFromLastTieredOffset(topicPartition, fetchState)
+      val leaderLogStartOffsetAndEpoch = 
leader.fetchEarliestOffset(topicPartition, fetchState.currentLeaderEpoch())
+      val fetchOffsetAndEpoch = if (isLastTieredOffsetFetchEnabled) {
+        fetchEarliestPendingUploadOffset(topicPartition, 
fetchState.currentLeaderEpoch(), leaderLogStartOffsetAndEpoch)
+      } else {
+        leader.fetchEarliestLocalOffset(topicPartition, 
fetchState.currentLeaderEpoch())
+      }
+      val newFetchState = fetchTierStateMachine.start(topicPartition, 
fetchState.topicId(), fetchState.currentLeaderEpoch(),
+        fetchOffsetAndEpoch, leaderLogStartOffsetAndEpoch.offset())
 
       // TODO: use fetchTierStateMachine.maybeAdvanceState when implementing 
async tiering logic in KAFKA-13560
 
@@ -805,6 +819,29 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
+  /**
+   * Determines the earliest offset for pending uploads, taking into account
+   * both local and remote storage conditions.
+   */
+  private def fetchEarliestPendingUploadOffset(topicPartition: TopicPartition, 
currentLeaderEpoch: Int, leaderLogStartOffsetAndEpoch: OffsetAndEpoch): 
OffsetAndEpoch = {
+    val earliestPendingUploadOffset = 
leader.fetchEarliestPendingUploadOffset(topicPartition, currentLeaderEpoch)
+    if (earliestPendingUploadOffset.offset == -1L) {
+      val leaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch)
+      if (leaderLocalStartOffset.offset == 
leaderLogStartOffsetAndEpoch.offset) {
+        return leaderLocalStartOffset
+      }
+      throw new OffsetNotAvailableException("Segments are uploaded to remote 
storage, but the leader does not know the earliest pending upload offset.")
+    }
+    earliestPendingUploadOffset
+  }
+
+  private def shouldFetchFromLastTieredOffset(topicPartition: TopicPartition, 
fetchState: PartitionFetchState): Boolean = {
+    val leaderEndOffset = leader.fetchLatestOffset(topicPartition, 
fetchState.currentLeaderEpoch())
+    val replicaEndOffset = logEndOffset(topicPartition)
+
+    shouldFetchFromLastTieredOffset(topicPartition, leaderEndOffset.offset(), 
replicaEndOffset)
+  }
+
   private def delayPartitions(partitions: Iterable[TopicPartition], delay: 
Long): Unit = {
     partitionMapLock.lockInterruptibly()
     try {
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala 
b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index 852cd85420d..30e5231f8c8 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -144,10 +144,10 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
     } else {
       val highestRemoteOffset = log.highestOffsetInRemoteStorage()
       val logStartOffset = fetchEarliestOffset(topicPartition, 
currentLeaderEpoch)
+      val localLogStartOffset = fetchEarliestLocalOffset(topicPartition, 
currentLeaderEpoch)
 
       highestRemoteOffset match {
         case -1L =>
-          val localLogStartOffset = fetchEarliestLocalOffset(topicPartition, 
currentLeaderEpoch)
           if (localLogStartOffset.offset() == logStartOffset.offset()) {
             // No segments have been uploaded yet
             logStartOffset
@@ -156,7 +156,7 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
             new OffsetAndEpoch(-1L, -1)
           }
         case _ =>
-          val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1, 
logStartOffset.offset())
+          val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1, 
Math.max(logStartOffset.offset(), localLogStartOffset.offset()))
           val epoch = 
log.leaderEpochCache.epochForOffset(earliestPendingUploadOffset)
           new OffsetAndEpoch(earliestPendingUploadOffset, epoch.orElse(0))
       }
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 797d8c933f3..70e04c94b20 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -284,6 +284,111 @@ class LocalLeaderEndPointTest extends Logging {
     assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
   }
 
+  @Test
+  def 
testEarliestPendingUploadOffsetWhenLocalStartGreaterThanStartWithKnownRemoteOffset():
 Unit = {
+    // Append records to create initial log state
+    appendRecords(replicaManager, topicIdPartition, records)
+      .onFire(response => assertEquals(Errors.NONE, response.error))
+
+    val log = 
replicaManager.getPartitionOrException(topicPartition).localLogOrException
+
+    // Simulate remote upload up to offset 5
+    log.updateHighestOffsetInRemoteStorage(5)
+
+    // Simulate local log start advancing to offset 8
+    log.updateLocalLogStartOffset(8)
+
+    // Expected: max(highestRemoteOffset + 1, max(logStartOffset, 
localLogStartOffset))
+    //         = max(5 + 1, max(0, 8)) = max(6, 8) = 8
+    val expectedOffset = 8L
+    val epoch = log.leaderEpochCache().epochForOffset(expectedOffset).orElse(0)
+
+    val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
+    assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
+  }
+
+  @Test
+  def 
testEarliestPendingUploadOffsetWhenLogStartGreaterThanLocalStartWithLowRemoteOffset():
 Unit = {
+    // Append 12 records (offsets 0-11)
+    for (_ <- 1 to 4) {
+      appendRecords(replicaManager, topicIdPartition, records)
+        .onFire(response => assertEquals(Errors.NONE, response.error))
+    }
+
+    // Delete records to advance logStartOffset to 10
+    replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 10), _ 
=> ())
+
+    val log = 
replicaManager.getPartitionOrException(topicPartition).localLogOrException
+
+    // Set localLogStartOffset to 3 (less than logStartOffset)
+    log.updateLocalLogStartOffset(3)
+
+    // Set highestRemoteOffset to 5 (less than logStartOffset)
+    log.updateHighestOffsetInRemoteStorage(5)
+
+    // Expected: max(5+1, max(10, 3)) = max(6, 10) = 10
+    val expectedOffset = 10L
+    val epoch = log.leaderEpochCache().epochForOffset(expectedOffset).orElse(0)
+
+    val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
+    assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
+  }
+
+  @Test
+  def testEarliestPendingUploadOffsetWhenRemoteOffsetDominatesLogStart(): Unit 
= {
+    // Append 18 records (offsets 0-17)
+    for (_ <- 1 to 6) {
+      appendRecords(replicaManager, topicIdPartition, records)
+        .onFire(response => assertEquals(Errors.NONE, response.error))
+    }
+
+    // Delete records to advance logStartOffset to 10
+    replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 10), _ 
=> ())
+
+    val log = 
replicaManager.getPartitionOrException(topicPartition).localLogOrException
+
+    // Set localLogStartOffset to 3 (less than logStartOffset)
+    log.updateLocalLogStartOffset(3)
+
+    // Set highestRemoteOffset to 15 (greater than logStartOffset)
+    log.updateHighestOffsetInRemoteStorage(15)
+
+    // Expected: max(15+1, max(10, 3)) = max(16, 10) = 16
+    val expectedOffset = 16L
+    val epoch = log.leaderEpochCache().epochForOffset(expectedOffset).orElse(0)
+
+    val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
+    assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
+  }
+
+  @Test
+  def 
testEarliestPendingUploadOffsetWhenBothStartOffsetsEqualAndDominateRemote(): 
Unit = {
+    // Append 12 records (offsets 0-11)
+    for (_ <- 1 to 4) {
+      appendRecords(replicaManager, topicIdPartition, records)
+        .onFire(response => assertEquals(Errors.NONE, response.error))
+    }
+
+    // Delete records to advance both logStartOffset and localLogStartOffset 
to 10
+    replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 10), _ 
=> ())
+
+    val log = 
replicaManager.getPartitionOrException(topicPartition).localLogOrException
+
+    // Verify both offsets are equal (deleteRecords updates both)
+    assertEquals(10L, log.logStartOffset())
+    assertEquals(10L, log.localLogStartOffset())
+
+    // Set highestRemoteOffset to 5 (less than start offsets)
+    log.updateHighestOffsetInRemoteStorage(5)
+
+    // Expected: max(5+1, max(10, 10)) = max(6, 10) = 10
+    val expectedOffset = 10L
+    val epoch = log.leaderEpochCache().epochForOffset(expectedOffset).orElse(0)
+
+    val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
+    assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
+  }
+
   private class CallbackResult[T] {
     private var value: Option[T] = None
     private var fun: Option[T => Unit] = None
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index 97e2d5b7181..d79c39817e2 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -18,7 +18,6 @@ package kafka.server
 
 import com.yammer.metrics.core.Gauge
 import kafka.utils.TestUtils
-import org.apache.kafka.common.message.FetchResponseData.PartitionData
 import org.apache.kafka.common.message.{FetchResponseData, 
OffsetForLeaderEpochRequestData}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
 import org.apache.kafka.common.metrics.Metrics
@@ -323,7 +322,7 @@ class AbstractFetcherManagerTest {
   }
 
   private class MockResizeFetcherTierStateMachine extends 
TierStateMachine(null, null, false) {
-    override def start(topicPartition: TopicPartition, currentFetchState: 
PartitionFetchState, fetchPartitionData: PartitionData): PartitionFetchState = {
+    override def start(topicPartition: TopicPartition, topicId: 
Optional[Uuid], currentLeaderEpoch: Int, fetchStartOffsetAndEpoch: 
OffsetAndEpoch, leaderLogStartOffset: Long): PartitionFetchState = {
       throw new UnsupportedOperationException("Materializing tier state is not 
supported in this test.")
     }
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 10d11166109..ea01994d411 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, Test}
 import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch}
 import org.apache.kafka.common.message.{FetchResponseData, 
OffsetForLeaderEpochRequestData}
+import 
org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
 import org.apache.kafka.server.{PartitionFetchState, ReplicaState}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -1740,4 +1741,2113 @@ class AbstractFetcherThreadTest {
     assertEquals(151, replicaState.logEndOffset)
     assertEquals(151, replicaState.highWatermark)
   }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Empty Leader Log
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when fetching from last tiered offset 
with an empty leader log
+   * - Test scenario: Leader has no data and no segments uploaded
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader log: **Empty**
+   * - earliestPendingUploadOffset: **-1** (no upload information available)
+   * - Leader LogStartOffset: **0**
+   * - Leader LocalLogStartOffset: **0**
+   *
+   * Scenario:
+   * - The leader log is completely empty with no records
+   * - No segments have been uploaded to remote storage
+   * - The follower attempts to fetch from the last tiered offset
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. All offsets remain at 0:
+   *    - logStartOffset = 0
+   *    - localLogStartOffset = 0
+   *    - logEndOffset = 0
+   *    - highWatermark = 0
+   * 3. No data is fetched (log size = 0)
+   */
+  @Test
+  def testLastTieredOffsetWithEmptyLeaderLog(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    // Empty Logs
+    val leaderLog = Seq()
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 0L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = -1
+    )
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(0, replicaState.localLogStartOffset)
+    assertEquals(0, replicaState.logEndOffset)
+    assertEquals(0, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with No Segments Uploaded
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when leader has local data but no 
segments uploaded to remote storage
+   * - Test scenario: Leader has 3 record batches locally, but upload to 
tiered storage hasn't started yet
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader log: **3 batches** at offsets 0, 150, 199
+   * - Leader LogStartOffset: **0**
+   * - Leader LocalLogStartOffset: **0**
+   * - earliestPendingUploadOffset: **-1** (no segments uploaded yet)
+   *
+   * Scenario:
+   * - The leader has local data (3 record batches)
+   * - No segments have been uploaded to remote storage yet 
(earliestPendingUploadOffset = -1)
+   * - The follower starts from offset 0 and fetches the local data
+   * - Since there's no gap between logStartOffset and where local log starts, 
fetching proceeds normally
+   *
+   * Expected Outcomes:
+   * 1. Follower successfully fetches from offset 0
+   * 2. After 3 doWork() calls, all 3 batches are fetched:
+   *    - log size = 3
+   *    - logStartOffset = 0
+   *    - localLogStartOffset = 0
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   */
+  @Test
+  def testLastTieredOffsetWithNoSegmentsUploaded(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      // LogStartOffset = LocalLogStartOffset = 0
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = -1
+    )
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(1, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(0, replicaState.localLogStartOffset)
+    assertEquals(1, replicaState.logEndOffset)
+
+    // Only 1 record batch is returned after a poll, so calling 'n' number of 
times to get the desired result
+    for (_ <- 1 to 2) fetcher.doWork()
+    assertEquals(3, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(0, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Partial Upload on New Leader (Gap 
Scenario)
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when new leader doesn't know upload 
state and there's a gap
+   * - Test scenario: Gap between logStartOffset (0) and where local log 
actually starts (10)
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **0**
+   * - Leader LocalLogStartOffset: **10** (local log starts at offset 10, 
creating a gap)
+   * - earliestPendingUploadOffset: **-1** (new leader doesn't know about 
previous uploads)
+   * - Leader has 3 batches at offsets 10, 150, 199
+   *
+   * Scenario:
+   * - A new leader takes over that has local log starting at offset 10
+   * - The logStartOffset is 0, but local log doesn't have data before offset 
10
+   * - This creates a gap: logStartOffset (0) != where local log starts (10)
+   * - New leader doesn't have upload information (earliestPendingUploadOffset 
= -1)
+   * - Follower cannot safely determine where to fetch from
+   *
+   * Expected Outcomes:
+   * 1. Follower does NOT fetch immediately (gap scenario with unknown upload 
state)
+   * 2. Fetch state shows delayed retry:
+   *    - fetchOffset = 0 (unchanged)
+   *    - lag is empty
+   *    - delay is present (will retry after delay)
+   * 3. LogEndOffset remains 0
+   */
+  @Test
+  def testLastTieredOffsetWithPartialUploadOnNewLeader(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      // LocalLogStartOffset = 10
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = -1
+    )
+    leaderState.logStartOffset = 0
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    val fetchStateOpt = fetcher.fetchState(partition)
+    assertTrue(fetchStateOpt.nonEmpty)
+    assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+    // Fetch offset remains unchanged
+    assertEquals(0, fetchStateOpt.get.fetchOffset)
+    // Lag remains unchanged
+    assertTrue(fetchStateOpt.get.lag.isEmpty)
+    assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+    // Fetch will be retried after some delay
+    assertTrue(fetchStateOpt.get.delay.isPresent)
+    assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+
+    // LogEndOffset is unchanged
+    assertEquals(0, replicaState.logEndOffset)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Slow Partial Upload
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when upload to tiered storage is in 
progress but slow
+   * - Test scenario: Upload has just started, with 
earliestPendingUploadOffset matching local log start
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **0**
+   * - Leader LocalLogStartOffset: **10** (local log starts at offset 10)
+   * - earliestPendingUploadOffset: **10** (upload just started at offset 10)
+   * - Leader has 3 batches at offsets 10, 150, 199
+   *
+   * Scenario:
+   * - Leader has uploaded segments before offset 10 to remote storage
+   * - Local log starts at offset 10 (earlier segments are in remote storage)
+   * - Upload is in progress with earliestPendingUploadOffset = 10 (slow 
upload - no progress yet)
+   * - Follower uses lastTieredOffset logic to determine fetch start point
+   * - Since earliestPendingUploadOffset is provided, follower can safely 
fetch from offset 10
+   *
+   * Expected Outcomes:
+   * 1. Follower starts fetching from offset 10 (lastTieredOffset logic)
+   * 2. First doWork() call:
+   *    - localLogStartOffset = 10
+   *    - logEndOffset = 10
+   *    - fetchOffset = 10
+   * 3. After 3 additional doWork() calls, all data from offset 10 onwards is 
fetched:
+   *    - log size = 3
+   *    - logStartOffset = 0
+   *    - localLogStartOffset = 10
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   */
+  @Test
+  def testLastTieredOffsetWithSlowPartialUpload(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      // LocalLogStartOffset = 10
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 10
+    )
+    leaderState.logStartOffset = 0
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(10, replicaState.localLogStartOffset)
+    assertEquals(10, replicaState.logEndOffset)
+    assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    // Only 1 record batch is returned after a poll so calling 'n' number of 
times to get the desired result.
+    for (_ <- 1 to 3) fetcher.doWork()
+    assertEquals(3, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(10, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Fast Partial Upload
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when upload to tiered storage has made 
significant progress
+   * - Test scenario: Upload has progressed to offset 150, follower fetches 
only remaining data
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **0**
+   * - Leader LocalLogStartOffset: **10** (local log starts at offset 10)
+   * - earliestPendingUploadOffset: **150** (upload has progressed to offset 
150 - fast upload)
+   * - Leader has 3 batches at offsets 10, 150, 199
+   *
+   * Scenario:
+   * - Leader has uploaded segments up to offset 150 to remote storage
+   * - Local log starts at offset 10, but segments from 10-149 are uploaded
+   * - earliestPendingUploadOffset = 150 indicates upload has made significant 
progress
+   * - Follower uses lastTieredOffset logic to start from offset 150
+   * - Only remaining batches (150, 199) need to be fetched locally
+   *
+   * Expected Outcomes:
+   * 1. Follower starts fetching from offset 150 (lastTieredOffset logic)
+   * 2. First doWork() call:
+   *    - localLogStartOffset = 150
+   *    - logEndOffset = 150
+   *    - fetchOffset = 150
+   * 3. After 2 additional doWork() calls, only remaining batches are fetched:
+   *    - log size = 2 (only batches at 150 and 199)
+   *    - logStartOffset = 0
+   *    - localLogStartOffset = 150
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   */
+  @Test
+  def testLastTieredOffsetWithFastPartialUpload(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      // LocalLogStartOffset = 10
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 150
+    )
+    leaderState.logStartOffset = 0
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(150, replicaState.localLogStartOffset)
+    assertEquals(150, replicaState.logEndOffset)
+    assertEquals(Some(150), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    // Only 1 record batch is returned after a poll so calling 'n' number of 
times to get the desired result.
+    for (_ <- 1 to 2) fetcher.doWork()
+    assertEquals(2, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(150, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Full Upload Completed
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when all segments have been uploaded 
to remote storage
+   * - Test scenario: Leader has completed uploading all data, no local data 
remains to be fetched
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **0**
+   * - Leader LocalLogStartOffset: **10** (local log starts at offset 10)
+   * - earliestPendingUploadOffset: **200** (all segments uploaded - matches 
logEndOffset)
+   * - Leader has 3 batches at offsets 10, 150, 199
+   *
+   * Scenario:
+   * - Leader has uploaded all segments to remote storage
+   * - earliestPendingUploadOffset = 200 indicates upload has completed up to 
logEndOffset
+   * - Local log starts at offset 10, but all data from 10-199 is already 
uploaded
+   * - Follower uses lastTieredOffset logic to determine it should start from 
offset 200
+   * - Since logEndOffset = earliestPendingUploadOffset = 200, there's no 
local data to fetch
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. Follower starts at offset 200 (end of uploaded data):
+   *    - localLogStartOffset = 200
+   *    - logEndOffset = 200
+   *    - fetchOffset = 200
+   * 3. No batches are fetched (all data already uploaded):
+   *    - log size = 0
+   *    - logStartOffset = 0
+   *    - highWatermark = 200
+   */
+  @Test
+  def testLastTieredOffsetWithFullUploadCompleted(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      // LocalLogStartOffset = 10
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 200
+    )
+    leaderState.logStartOffset = 0
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(200, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(200, replicaState.highWatermark)
+    assertEquals(Some(200), fetcher.fetchState(partition).map(_.fetchOffset()))
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Inactive Leader and Full Upload on 
New Leader
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when new leader has empty log with all 
segments uploaded
+   * - Test scenario: New leader doesn't know upload state, creating a gap 
scenario
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader log: **Empty** (inactive leader - no local data)
+   * - Leader LogStartOffset: **0**
+   * - Leader LocalLogStartOffset: **200** (all local segments 
uploaded/deleted)
+   * - Leader LogEndOffset: **200**
+   * - earliestPendingUploadOffset: **-1** (new leader doesn't know about 
previous uploads)
+   *
+   * Scenario:
+   * - New leader takes over with empty local log
+   * - All data (0-199) has been uploaded to remote storage and deleted locally
+   * - logStartOffset = 0, but localLogStartOffset = 200 (creating a gap)
+   * - New leader doesn't have upload information (earliestPendingUploadOffset 
= -1)
+   * - Follower cannot safely determine where to fetch from due to gap and 
unknown upload state
+   *
+   * Expected Outcomes:
+   * 1. Follower does NOT fetch immediately (gap scenario with unknown upload 
state)
+   * 2. Fetch state shows delayed retry:
+   *    - fetchOffset = 0 (unchanged)
+   *    - lag is empty
+   *    - delay is present (will retry after delay)
+   *    - state = FETCHING
+   *    - currentLeaderEpoch = 0
+   *    - lastFetchedEpoch = 0
+   * 3. LogEndOffset remains 0 (no progress until upload state known)
+   */
+  @Test
+  def testLastTieredOffsetWithInactiveLeaderFullUploadOnNewLeader(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    // Empty logs
+    val leaderLog = Seq()
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = -1
+    )
+    leaderState.logStartOffset = 0
+    leaderState.localLogStartOffset = 200
+    leaderState.logEndOffset = 200
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    val fetchStateOpt = fetcher.fetchState(partition)
+    assertTrue(fetchStateOpt.nonEmpty)
+    assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+    // Fetch offset remains unchanged
+    assertEquals(0, fetchStateOpt.get.fetchOffset)
+    // Lag remains unchanged
+    assertTrue(fetchStateOpt.get.lag.isEmpty)
+    assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+    // Fetch will be retried after some delay
+    assertTrue(fetchStateOpt.get.delay.isPresent)
+    assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+
+    // LogEndOffset is unchanged
+    assertEquals(0, replicaState.logEndOffset)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Inactive Leader and Full Upload 
Completed
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when leader has empty log with known 
full upload
+   * - Test scenario: Leader has empty log but knows all data is uploaded 
(earliestPendingUploadOffset = 200)
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader log: **Empty** (inactive leader - no local data)
+   * - Leader LogStartOffset: **0**
+   * - Leader LocalLogStartOffset: **200** (all local segments 
uploaded/deleted)
+   * - Leader LogEndOffset: **200**
+   * - earliestPendingUploadOffset: **200** (full upload completed - leader 
knows upload state)
+   *
+   * Scenario:
+   * - Leader has empty local log (all data uploaded and deleted locally)
+   * - All data (0-199) has been uploaded to remote storage
+   * - earliestPendingUploadOffset = 200 indicates leader knows about the 
completed upload
+   * - Follower uses lastTieredOffset logic to start from offset 200
+   * - Since all data is uploaded, no local fetching is needed
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. Follower adjusts to upload endpoint:
+   *    - localLogStartOffset = 200
+   *    - logEndOffset = 200
+   *    - fetchOffset = 200
+   *    - highWatermark = 200
+   * 3. No batches are fetched (all data already uploaded):
+   *    - log size = 0
+   *    - logStartOffset = 0
+   */
+  @Test
+  def testLastTieredOffsetWithInactiveLeaderFullUpload(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    // Empty logs
+    val leaderLog = Seq()
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 200
+    )
+    leaderState.logStartOffset = 0
+    leaderState.localLogStartOffset = 200
+    leaderState.logEndOffset = 200
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(200, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(200, replicaState.highWatermark)
+    assertEquals(Some(200), fetcher.fetchState(partition).map(_.fetchOffset()))
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Empty Leader Log, Non-Zero LSO on New 
Leader
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when leader has empty log with 
non-zero logStartOffset
+   * - Test scenario: Leader has no data with non-zero start offset, new 
leader scenario
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader log: **Empty** (no data)
+   * - Leader LogStartOffset: **10** (non-zero - segments 0-9 previously 
deleted)
+   * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no gap)
+   * - Leader LogEndOffset: **10**
+   * - earliestPendingUploadOffset: **-1** (new leader doesn't know upload 
state)
+   *
+   * Scenario:
+   * - Leader has empty log with logStartOffset = 10 (segments 0-9 were 
previously deleted/uploaded)
+   * - No gap exists (logStartOffset = localLogStartOffset = 10)
+   * - New leader doesn't have upload information (earliestPendingUploadOffset 
= -1)
+   * - Since there's no gap, follower can safely fetch from logStartOffset
+   * - Leader log is empty (logEndOffset = logStartOffset = 10), so no data to 
fetch
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts to logStartOffset:
+   *    - localLogStartOffset = 10
+   *    - logEndOffset = 10
+   *    - highWatermark = 10
+   *    - fetchOffset = 10
+   * 3. Second doWork() call confirms stable state:
+   *    - logStartOffset = 10
+   *    - localLogStartOffset = 10
+   *    - logEndOffset = 10
+   *    - highWatermark = 10
+   * 4. No batches are fetched (log is empty):
+   *    - log size = 0
+   */
+  @Test
+  def testLastTieredOffsetWithEmptyLeaderLogNonZeroLSOOnNewLeader(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    // Empty logs
+    val leaderLog = Seq()
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 10L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = -1
+    )
+    leaderState.logStartOffset = 10
+    leaderState.localLogStartOffset = 10
+    leaderState.logEndOffset = 10
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(10, replicaState.localLogStartOffset)
+    assertEquals(10, replicaState.logEndOffset)
+    assertEquals(10, replicaState.highWatermark)
+    assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    fetcher.doWork()
+    assertEquals(0, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(10, replicaState.localLogStartOffset)
+    assertEquals(10, replicaState.logEndOffset)
+    assertEquals(10, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Empty Leader Log and Stale Upload 
Offset
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when leader has empty log with stale 
upload offset
+   * - Test scenario: Stale upload offset is ignored in favor of logStartOffset
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader log: **Empty** (no data)
+   * - Leader LogStartOffset: **10** (non-zero - segments 0-9 previously 
deleted)
+   * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no gap)
+   * - Leader LogEndOffset: **10**
+   * - earliestPendingUploadOffset: **5** (stale - before logStartOffset of 10)
+   *
+   * Scenario:
+   * - Leader has empty log with logStartOffset = 10 (segments 0-9 previously 
deleted/uploaded)
+   * - earliestPendingUploadOffset = 5 is stale (before logStartOffset of 10)
+   * - Despite stale upload offset, presence of upload offset indicates leader 
knows about tiered storage
+   * - Follower ignores stale value and uses logStartOffset (10) to determine 
fetch point
+   * - Since logEndOffset = logStartOffset = 10, log is empty and no data to 
fetch
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts to logStartOffset:
+   *    - localLogStartOffset = 10
+   *    - logEndOffset = 10
+   *    - highWatermark = 10
+   *    - fetchOffset = 10
+   * 3. Second doWork() call confirms stable state:
+   *    - logStartOffset = 10
+   *    - localLogStartOffset = 10
+   *    - logEndOffset = 10
+   *    - highWatermark = 10
+   * 4. No batches are fetched (log is empty):
+   *    - log size = 0
+   */
+  @Test
+  def testLastTieredOffsetWithEmptyLeaderLogStaleUploadOffset(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    // Empty logs
+    val leaderLog = Seq()
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 10L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 5
+    )
+    leaderState.logStartOffset = 10
+    leaderState.localLogStartOffset = 10
+    leaderState.logEndOffset = 10
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(10, replicaState.localLogStartOffset)
+    assertEquals(10, replicaState.logEndOffset)
+    assertEquals(10, replicaState.highWatermark)
+    assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    fetcher.doWork()
+    assertEquals(0, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(10, replicaState.localLogStartOffset)
+    assertEquals(10, replicaState.logEndOffset)
+    assertEquals(10, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Non-Zero LSO on New Leader
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when leader has non-zero 
logStartOffset without upload info
+   * - Test scenario: New leader with data but no upload information
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+   * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no 
deletions)
+   * - earliestPendingUploadOffset: **-1** (new leader doesn't know upload 
state)
+   * - Leader has 3 batches at offsets 10, 150, 199
+   *
+   * Scenario:
+   * - Leader log starts at offset 10 with local data available
+   * - No gap exists (logStartOffset = localLogStartOffset = 10)
+   * - New leader doesn't have upload information (earliestPendingUploadOffset 
= -1)
+   * - Since there's no gap, follower can safely fetch from logStartOffset
+   * - Follower calculates lag correctly as difference between logEndOffset 
and fetchOffset
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. Follower starts at logStartOffset:
+   *    - fetchOffset = 10
+   *    - lag is calculated correctly (190 = 200 - 10)
+   *    - state = FETCHING
+   *    - currentLeaderEpoch = 0
+   *    - lastFetchedEpoch = 0
+   * 3. LogEndOffset updates to logStartOffset:
+   *    - logEndOffset = 10
+   */
+  @Test
+  def testLastTieredOffsetWithNonZeroLSOOnNewLeader(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      // LogStartOffset = LocalLogStartOffset = 10
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = -1
+    )
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    val fetchStateOpt = fetcher.fetchState(partition)
+    assertTrue(fetchStateOpt.nonEmpty)
+    assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+    assertEquals(10, fetchStateOpt.get.fetchOffset)
+    assertFalse(fetchStateOpt.get.lag.isEmpty)
+    assertEquals(190, fetchStateOpt.get.lag().get())
+    assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+    assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+    assertEquals(10, replicaState.logEndOffset)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Non-Zero LSO and Stale Upload Offset
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when leader has stale upload offset 
with non-zero LSO
+   * - Test scenario: Stale upload offset is ignored, follower fetches from 
logStartOffset
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+   * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no 
deletions)
+   * - earliestPendingUploadOffset: **5** (stale - before logStartOffset of 10)
+   * - Leader has 3 batches at offsets 10, 150, 199
+   *
+   * Scenario:
+   * - Leader log starts at offset 10 with local data available
+   * - earliestPendingUploadOffset = 5 is stale (before logStartOffset of 10)
+   * - Despite stale upload offset, presence of upload offset indicates leader 
knows about tiered storage
+   * - Follower ignores stale value and uses logStartOffset (10) to determine 
fetch point
+   * - Since logStartOffset = localLogStartOffset = 10 (no gap), follower can 
fetch from offset 10
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts to logStartOffset:
+   *    - localLogStartOffset = 10
+   *    - logEndOffset = 10
+   *    - highWatermark = 10
+   *    - fetchOffset = 10
+   * 3. After 3 additional doWork() calls, all 3 batches are fetched:
+   *    - log size = 3
+   *    - logStartOffset = 10
+   *    - localLogStartOffset = 10
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   */
+  @Test
+  def testLastTieredOffsetWithNonZeroLSOStaleUploadOffset(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      // LogStartOffset = LocalLogStartOffset = 10
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 5
+    )
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(10, replicaState.localLogStartOffset)
+    assertEquals(10, replicaState.logEndOffset)
+    assertEquals(10, replicaState.highWatermark)
+    assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    for (_ <- 1 to 3) fetcher.doWork()
+    assertEquals(3, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(10, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Slow Upload and No Local Deletion
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when upload is in progress (slow) with 
no local deletions
+   * - Test scenario: Upload has just started, follower fetches from upload 
point
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+   * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no 
deletions)
+   * - earliestPendingUploadOffset: **10** (slow upload - just started at 
offset 10)
+   * - Leader has 3 batches at offsets 10, 150, 199
+   *
+   * Scenario:
+   * - Leader log starts at offset 10 with local data available
+   * - Upload has just started with earliestPendingUploadOffset = 10 (slow 
upload - no progress yet)
+   * - Follower uses lastTieredOffset logic to determine fetch point
+   * - Since earliestPendingUploadOffset is provided, follower can safely 
fetch from offset 10
+   * - No segments have been uploaded yet (upload just starting)
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts to upload point:
+   *    - localLogStartOffset = 10
+   *    - logEndOffset = 10
+   *    - fetchOffset = 10
+   * 3. After 3 additional doWork() calls, all 3 batches are fetched:
+   *    - log size = 3
+   *    - logStartOffset = 10
+   *    - localLogStartOffset = 10
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   */
+  @Test
+  def testLastTieredOffsetWithSlowUploadNoLocalDeletion(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      // LogStartOffset = LocalLogStartOffset = 10
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 10
+    )
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(10, replicaState.localLogStartOffset)
+    assertEquals(10, replicaState.logEndOffset)
+    assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    // Only 1 record batch is returned after a poll so calling 'n' number of 
times to get the desired result.
+    for (_ <- 1 to 3) fetcher.doWork()
+    assertEquals(3, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(10, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Fast Upload and No Local Deletion
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when upload has made significant 
progress (fast)
+   * - Test scenario: Upload has progressed significantly, follower fetches 
only remaining data
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+   * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no 
deletions)
+   * - earliestPendingUploadOffset: **150** (fast upload - significant 
progress to offset 150)
+   * - Leader has 3 batches at offsets 10, 150, 199
+   *
+   * Scenario:
+   * - Leader log starts at offset 10 with local data available
+   * - Upload has progressed to offset 150 (fast upload - batch at offset 10 
already uploaded)
+   * - Follower uses lastTieredOffset logic to start from 
earliestPendingUploadOffset
+   * - Only remaining batches (150, 199) need to be fetched locally
+   * - Batch at offset 10 is already uploaded to remote storage
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts to upload point:
+   *    - localLogStartOffset = 150
+   *    - logEndOffset = 150
+   *    - fetchOffset = 150
+   * 3. After 2 additional doWork() calls, only remaining batches are fetched:
+   *    - log size = 2 (only batches at 150 and 199)
+   *    - logStartOffset = 10
+   *    - localLogStartOffset = 150
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   */
+  @Test
+  def testLastTieredOffsetWithFastUploadNoLocalDeletion(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      // LogStartOffset = LocalLogStartOffset = 10
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 150
+    )
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(150, replicaState.localLogStartOffset)
+    assertEquals(150, replicaState.logEndOffset)
+    assertEquals(Some(150), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    // Only 1 record batch is returned after a poll so calling 'n' number of 
times to get the desired result.
+    for (_ <- 1 to 2) fetcher.doWork()
+    assertEquals(2, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(150, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Full Upload and No Local Deletions
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when all segments are uploaded and no 
local deletions occurred
+   * - Test scenario: Leader log starts at non-zero offset with full upload 
completed
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+   * - Leader LocalLogStartOffset: **10** (no local deletions - same as 
logStartOffset)
+   * - earliestPendingUploadOffset: **200** (full upload completed - all data 
uploaded)
+   * - Leader has 3 batches at offsets 10, 150, 199
+   *
+   * Scenario:
+   * - Leader log starts at offset 10 (non-zero)
+   * - No local deletions have occurred (logStartOffset = localLogStartOffset 
= 10)
+   * - All segments have been uploaded to remote storage 
(earliestPendingUploadOffset = 200)
+   * - Follower uses lastTieredOffset logic to start from offset 200
+   * - Since all data is uploaded, no local fetching is needed
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts offsets to upload endpoint:
+   *    - localLogStartOffset = 200
+   *    - logEndOffset = 200
+   *    - fetchOffset = 200
+   * 3. Second doWork() call updates logStartOffset:
+   *    - logStartOffset = 10 (leader's logStartOffset)
+   *    - localLogStartOffset = 200
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   * 4. No batches are fetched (all data already uploaded):
+   *    - log size = 0
+   */
+  @Test
+  def testLastTieredOffsetWithFullUploadNoLocalDeletion(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      // LogStartOffset = LocalLogStartOffset = 10
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 200
+    )
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(200, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(Some(200), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    fetcher.doWork()
+    assertEquals(0, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(200, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Partial Upload on New Leader 
(Non-Zero LSO, Gap Scenario)
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when new leader doesn't know upload 
state with non-zero log start
+   * - Test scenario: Gap between logStartOffset (10) and where local log 
actually starts (100)
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10**
+   * - Leader LocalLogStartOffset: **100** (local log starts at offset 100, 
creating a gap)
+   * - earliestPendingUploadOffset: **-1** (new leader doesn't know about 
previous uploads)
+   * - Leader has 3 batches at offsets 100, 150, 199
+   *
+   * Scenario:
+   * - A new leader takes over that has local log starting at offset 100
+   * - The logStartOffset is 10, but local log doesn't have data before offset 
100
+   * - This creates a gap: logStartOffset (10) != where local log starts (100)
+   * - New leader doesn't have upload information (earliestPendingUploadOffset 
= -1)
+   * - Follower cannot safely determine where to fetch from
+   *
+   * Expected Outcomes:
+   * 1. Follower does NOT fetch immediately (gap scenario with unknown upload 
state)
+   * 2. Fetch state shows delayed retry:
+   *    - fetchOffset = 0 (unchanged)
+   *    - lag is empty
+   *    - delay is present (will retry after delay)
+   * 3. LogEndOffset remains 0
+   */
+  @Test
+  def testLastTieredOffsetWithPartialUploadOnNewLeaderNonZeroLSO(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 100, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = -1
+    )
+    leaderState.logStartOffset = 10
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    val fetchStateOpt = fetcher.fetchState(partition)
+    assertTrue(fetchStateOpt.nonEmpty)
+    assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+    assertEquals(0, fetchStateOpt.get.fetchOffset)
+    assertTrue(fetchStateOpt.get.lag.isEmpty)
+    assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+    assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+    // Fetch will be retried after some delay
+    assertTrue(fetchStateOpt.get.delay.isPresent)
+    assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+
+    // LogEndOffset is unchanged
+    assertEquals(0, replicaState.logEndOffset)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Partial Upload and Stale Upload Offset
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when earliestPendingUploadOffset is 
stale (before logStartOffset)
+   * - Test scenario: Upload offset is stale but local log starts at offset 
100, follower can still fetch
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10**
+   * - Leader LocalLogStartOffset: **100** (local log starts at offset 100)
+   * - earliestPendingUploadOffset: **5** (stale - before logStartOffset of 10)
+   * - Leader has 3 batches at offsets 100, 150, 199
+   *
+   * Scenario:
+   * - Leader has earliestPendingUploadOffset = 5, which is stale (before 
logStartOffset of 10)
+   * - Despite stale upload offset, local log is available starting at offset 
100
+   * - Follower ignores the stale upload offset and uses localLogStartOffset 
(100) to start fetching
+   * - The presence of a (even stale) upload offset indicates the leader knows 
about tiered storage
+   *
+   * Expected Outcomes:
+   * 1. Follower successfully fetches from offset 100 (ignores stale upload 
offset)
+   * 2. First doWork() call:
+   *    - localLogStartOffset = 100
+   *    - logEndOffset = 100
+   *    - fetchOffset = 100
+   * 3. After 3 additional doWork() calls, all 3 batches from offset 100 
onwards are fetched:
+   *    - log size = 3
+   *    - logStartOffset = 10
+   *    - localLogStartOffset = 100
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   */
+  @Test
+  def testLastTieredOffsetWithPartialUploadAndStaleUploadOffset(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 100, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 5
+    )
+    leaderState.logStartOffset = 10
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(100, replicaState.localLogStartOffset)
+    assertEquals(100, replicaState.logEndOffset)
+    assertEquals(100, replicaState.highWatermark)
+    assertEquals(Some(100), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    for (_ <- 1 to 3) fetcher.doWork()
+    assertEquals(3, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(100, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Slow Upload and Non-Zero LSO
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when upload is slow with non-zero 
logStartOffset and local deletions
+   * - Test scenario: Upload just started at local log start point
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+   * - Leader LocalLogStartOffset: **100** (local deletions - local log starts 
at offset 100)
+   * - earliestPendingUploadOffset: **100** (slow upload - just started at 
offset 100)
+   * - Leader has 3 batches at offsets 100, 150, 199
+   *
+   * Scenario:
+   * - Leader log starts at offset 10 (segments 0-9 previously deleted)
+   * - Local log starts at offset 100 due to local deletions (segments 10-99 
deleted locally)
+   * - Upload has just started with earliestPendingUploadOffset = 100 (slow 
upload - no progress yet)
+   * - Follower uses lastTieredOffset logic to start from 
earliestPendingUploadOffset
+   * - All local data (100-199) needs to be fetched
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts to upload point:
+   *    - localLogStartOffset = 100
+   *    - logEndOffset = 100
+   *    - fetchOffset = 100
+   * 3. After 3 additional doWork() calls, all 3 batches are fetched:
+   *    - log size = 3
+   *    - logStartOffset = 10
+   *    - localLogStartOffset = 100
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   */
+  @Test
+  def testLastTieredOffsetWithSlowUploadNonZeroLSO(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 100, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 100
+    )
+    leaderState.logStartOffset = 10
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(100, replicaState.localLogStartOffset)
+    assertEquals(100, replicaState.logEndOffset)
+    assertEquals(Some(100), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    // Only 1 record batch is returned after a poll so calling 'n' number of 
times to get the desired result.
+    for (_ <- 1 to 3) fetcher.doWork()
+    assertEquals(3, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(100, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Fast Upload and Non-Zero LSO
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when upload has made significant 
progress with non-zero LSO
+   * - Test scenario: Fast upload progress with non-zero logStartOffset
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+   * - Leader LocalLogStartOffset: **100** (local deletions - local log starts 
at offset 100)
+   * - earliestPendingUploadOffset: **150** (fast upload - progressed to 
offset 150)
+   * - Leader has 3 batches at offsets 100, 150, 199
+   *
+   * Scenario:
+   * - Leader log starts at offset 10 (segments 0-9 previously deleted)
+   * - Local log starts at offset 100 due to local deletions (segments 10-99 
deleted locally)
+   * - Upload has progressed to offset 150 (fast upload - batch at offset 100 
already uploaded)
+   * - Follower uses lastTieredOffset logic to start from 
earliestPendingUploadOffset
+   * - Only remaining batches (150, 199) need to be fetched locally
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts to upload point:
+   *    - localLogStartOffset = 150
+   *    - logEndOffset = 150
+   *    - fetchOffset = 150
+   * 3. After 2 additional doWork() calls, only remaining batches are fetched:
+   *    - log size = 2 (only batches at 150 and 199)
+   *    - logStartOffset = 10
+   *    - localLogStartOffset = 150
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   */
+  @Test
+  def testLastTieredOffsetWithFastUploadNonZeroLSO(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 100, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 150
+    )
+    leaderState.logStartOffset = 10
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(150, replicaState.localLogStartOffset)
+    assertEquals(150, replicaState.logEndOffset)
+    assertEquals(Some(150), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    // Only 1 record batch is returned after a poll so calling 'n' number of 
times to get the desired result.
+    for (_ <- 1 to 2) fetcher.doWork()
+    assertEquals(2, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(150, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Full Upload and Non-Zero Local Start 
Offset
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when full upload is completed with 
local deletions
+   * - Test scenario: Leader has non-zero logStartOffset with local log 
starting at higher offset
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+   * - Leader LocalLogStartOffset: **100** (local deletions - local log starts 
at offset 100)
+   * - earliestPendingUploadOffset: **200** (full upload completed - all data 
uploaded)
+   * - Leader has 3 batches at offsets 100, 150, 199
+   *
+   * Scenario:
+   * - Leader log starts at offset 10 (non-zero)
+   * - Local log starts at offset 100 due to local deletions (segments 10-99 
deleted locally but in remote)
+   * - All segments have been uploaded to remote storage 
(earliestPendingUploadOffset = 200)
+   * - Follower uses lastTieredOffset logic to start from offset 200
+   * - Since all data is uploaded (even the locally deleted segments 10-99), 
no local fetching needed
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts offsets to upload endpoint:
+   *    - localLogStartOffset = 200
+   *    - logEndOffset = 200
+   *    - fetchOffset = 200
+   * 3. Second doWork() call updates logStartOffset:
+   *    - logStartOffset = 10 (leader's logStartOffset)
+   *    - localLogStartOffset = 200
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   * 4. No batches are fetched (all data already uploaded):
+   *    - log size = 0
+   */
+  @Test
+  def testLastTieredOffsetWithFullUploadNonZeroLSO(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 100, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 200
+    )
+    leaderState.logStartOffset = 10
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(200, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(Some(200), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    fetcher.doWork()
+    assertEquals(0, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(200, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Inactive Leader, Full Upload, 
Non-Zero LSO on New Leader
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when new leader has empty log with 
non-zero logStartOffset
+   * - Test scenario: New leader doesn't know upload state with gap scenario
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader log: **Empty** (inactive leader - no local data)
+   * - Leader LogStartOffset: **10** (non-zero - segments 0-9 previously 
deleted)
+   * - Leader LocalLogStartOffset: **200** (all local segments 
uploaded/deleted)
+   * - Leader LogEndOffset: **200**
+   * - earliestPendingUploadOffset: **-1** (new leader doesn't know about 
previous uploads)
+   *
+   * Scenario:
+   * - New leader takes over with empty local log
+   * - logStartOffset = 10 (segments 0-9 were previously deleted)
+   * - All remaining data (10-199) has been uploaded to remote storage and 
deleted locally
+   * - localLogStartOffset = 200 (creating a gap: 10 != 200)
+   * - New leader doesn't have upload information (earliestPendingUploadOffset 
= -1)
+   * - Follower cannot safely determine where to fetch from due to gap and 
unknown upload state
+   *
+   * Expected Outcomes:
+   * 1. Follower does NOT fetch immediately (gap scenario with unknown upload 
state)
+   * 2. Fetch state shows delayed retry:
+   *    - fetchOffset = 0 (unchanged)
+   *    - lag is empty
+   *    - delay is present (will retry after delay)
+   *    - state = FETCHING
+   *    - currentLeaderEpoch = 0
+   *    - lastFetchedEpoch = 0
+   * 3. LogEndOffset remains 0 (no progress until upload state known)
+   */
+  @Test
+  def testLastTieredOffsetWithInactiveLeaderFullUploadNonZeroLSOOnNewLeader(): 
Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    // Empty logs
+    val leaderLog = Seq()
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = -1
+    )
+    leaderState.logStartOffset = 10
+    leaderState.localLogStartOffset = 200
+    leaderState.logEndOffset = 200
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    val fetchStateOpt = fetcher.fetchState(partition)
+    assertTrue(fetchStateOpt.nonEmpty)
+    assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+    assertEquals(0, fetchStateOpt.get.fetchOffset)
+    assertTrue(fetchStateOpt.get.lag.isEmpty)
+    assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+    // Fetch will be retried after some delay
+    assertTrue(fetchStateOpt.get.delay.isPresent)
+    assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+
+    // LogEndOffset is unchanged
+    assertEquals(0, replicaState.logEndOffset)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Inactive Leader, Stale Upload Offset, 
Non-Zero LSO
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when leader has empty log with stale 
upload offset
+   * - Test scenario: Leader has stale upload offset but follower can still 
determine fetch point
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader log: **Empty** (inactive leader - no local data)
+   * - Leader LogStartOffset: **10** (non-zero - segments 0-9 previously 
deleted)
+   * - Leader LocalLogStartOffset: **200** (all local segments 
uploaded/deleted)
+   * - Leader LogEndOffset: **200**
+   * - earliestPendingUploadOffset: **5** (stale - before logStartOffset of 10)
+   *
+   * Scenario:
+   * - Leader has empty local log (all data uploaded and deleted locally)
+   * - earliestPendingUploadOffset = 5 is stale (before logStartOffset of 10)
+   * - Despite stale upload offset, presence of upload offset indicates leader 
knows about tiered storage
+   * - Follower ignores stale value and uses localLogStartOffset (200) to 
determine fetch point
+   * - Since localLogStartOffset = logEndOffset = 200, all data is uploaded
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts to localLogStartOffset:
+   *    - localLogStartOffset = 200
+   *    - logEndOffset = 200
+   *    - highWatermark = 200
+   * 3. Second doWork() call updates logStartOffset:
+   *    - logStartOffset = 10 (leader's logStartOffset)
+   *    - localLogStartOffset = 200
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   * 4. No batches are fetched (all data already uploaded):
+   *    - log size = 0
+   */
+  @Test
+  def testLastTieredOffsetWithInactiveLeaderStaleUploadNonZeroLSO(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    // Empty logs
+    val leaderLog = Seq()
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 5
+    )
+    leaderState.logStartOffset = 10
+    leaderState.localLogStartOffset = 200
+    leaderState.logEndOffset = 200
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(200, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(200, replicaState.highWatermark)
+
+    fetcher.doWork()
+    assertEquals(0, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(200, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Inactive Leader, Full Upload, 
Non-Zero LSO
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when leader has empty log with known 
full upload and non-zero LSO
+   * - Test scenario: Leader has empty log but knows all data is uploaded with 
non-zero logStartOffset
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader log: **Empty** (inactive leader - no local data)
+   * - Leader LogStartOffset: **10** (non-zero - segments 0-9 previously 
deleted)
+   * - Leader LocalLogStartOffset: **200** (all local segments 
uploaded/deleted)
+   * - Leader LogEndOffset: **200**
+   * - earliestPendingUploadOffset: **200** (full upload completed - leader 
knows upload state)
+   *
+   * Scenario:
+   * - Leader has empty local log (all data uploaded and deleted locally)
+   * - logStartOffset = 10 (segments 0-9 were previously deleted)
+   * - All remaining data (10-199) has been uploaded to remote storage
+   * - earliestPendingUploadOffset = 200 indicates leader knows about the 
completed upload
+   * - Follower uses lastTieredOffset logic to start from offset 200
+   * - Since all data is uploaded, no local fetching is needed
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts to upload endpoint:
+   *    - localLogStartOffset = 200
+   *    - logEndOffset = 200
+   *    - fetchOffset = 200
+   * 3. Second doWork() call updates logStartOffset:
+   *    - logStartOffset = 10 (leader's logStartOffset)
+   *    - localLogStartOffset = 200
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   * 4. No batches are fetched (all data already uploaded):
+   *    - log size = 0
+   */
+  @Test
+  def testLastTieredOffsetWithInactiveLeaderFullUploadNonZeroLSO(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    // Empty logs
+    val leaderLog = Seq()
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 200
+    )
+    leaderState.logStartOffset = 10
+    leaderState.localLogStartOffset = 200
+    leaderState.logEndOffset = 200
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(200, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(Some(200), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    fetcher.doWork()
+    assertEquals(0, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(200, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Stale Local Log Offset on New Leader
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when local log offset is stale (before 
logStartOffset)
+   * - Test scenario: New leader with stale local log offset creates gap 
scenario
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+   * - Leader LocalLogStartOffset: **5** (stale - before logStartOffset, 
creating inconsistency)
+   * - earliestPendingUploadOffset: **-1** (new leader doesn't know upload 
state)
+   * - Leader has 3 batches at offsets 5, 10, 199
+   *
+   * Scenario:
+   * - Leader has local log starting at offset 5 (before logStartOffset of 10)
+   * - This creates a stale/inconsistent state where localLogStartOffset < 
logStartOffset
+   * - New leader doesn't have upload information (earliestPendingUploadOffset 
= -1)
+   * - Follower cannot safely determine where to fetch from due to gap and 
unknown upload state
+   * - This is an unusual scenario where local log offset hasn't been updated 
properly
+   *
+   * Expected Outcomes:
+   * 1. Follower does NOT fetch immediately (gap scenario with unknown upload 
state)
+   * 2. Fetch state shows delayed retry:
+   *    - fetchOffset = 0 (unchanged)
+   *    - lag is empty
+   *    - delay is present (will retry after delay)
+   *    - state = FETCHING
+   *    - currentLeaderEpoch = 0
+   *    - lastFetchedEpoch = 0
+   * 3. LogEndOffset remains 0 (no progress until upload state known)
+   */
+  @Test
+  def testLastTieredOffsetStaleLocalLogOffsetNewLeader(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 5, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = -1
+    )
+    leaderState.logStartOffset = 10
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    val fetchStateOpt = fetcher.fetchState(partition)
+    assertTrue(fetchStateOpt.nonEmpty)
+    assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+    assertEquals(0, fetchStateOpt.get.fetchOffset)
+    assertTrue(fetchStateOpt.get.lag.isEmpty)
+    assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+    assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+    // Fetch will be retried after some delay
+    assertTrue(fetchStateOpt.get.delay.isPresent)
+    assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+
+    // LogEndOffset is unchanged
+    assertEquals(0, replicaState.logEndOffset)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Stale Local Log Offset and Slow Upload
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when local log offset is stale but 
upload info is available
+   * - Test scenario: Stale local log offset with known upload state allows 
fetching
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+   * - Leader LocalLogStartOffset: **5** (stale - before logStartOffset)
+   * - earliestPendingUploadOffset: **10** (slow upload - just started at 
offset 10)
+   * - Leader has 3 batches at offsets 5, 10, 199
+   *
+   * Scenario:
+   * - Leader has stale local log offset (5 < 10)
+   * - Upload has just started with earliestPendingUploadOffset = 10 (slow 
upload)
+   * - Despite stale local log offset, presence of upload offset indicates 
leader knows about tiered storage
+   * - Follower uses earliestPendingUploadOffset (10) to determine fetch point
+   * - Follower can safely fetch from offset 10 onwards
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts to upload point:
+   *    - localLogStartOffset = 10
+   *    - logEndOffset = 10
+   *    - fetchOffset = 10
+   * 3. After 2 additional doWork() calls, 2 batches are fetched:
+   *    - log size = 2 (batches at 10 and 199)
+   *    - logStartOffset = 10
+   *    - localLogStartOffset = 10
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   */
+  @Test
+  def testLastTieredOffsetStaleLocalLogOffsetSlowUpload():Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 5, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 10
+    )
+    leaderState.logStartOffset = 10
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(10, replicaState.localLogStartOffset)
+    assertEquals(10, replicaState.logEndOffset)
+    assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    for (_ <- 1 to 2) fetcher.doWork()
+    assertEquals(2, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(10, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Stale Local Log Offset and Fast Upload
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when local log offset is stale with 
significant upload progress
+   * - Test scenario: Stale local log offset with fast upload allows fetching 
from upload point
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+   * - Leader LocalLogStartOffset: **5** (stale - before logStartOffset)
+   * - earliestPendingUploadOffset: **150** (fast upload - progressed to 
offset 150)
+   * - Leader has 4 batches at offsets 5, 10, 150, 199
+   *
+   * Scenario:
+   * - Leader has stale local log offset (5 < 10)
+   * - Upload has progressed to offset 150 (fast upload - batches at 5 and 10 
already uploaded)
+   * - Despite stale local log offset, presence of upload offset indicates 
leader knows about tiered storage
+   * - Follower uses earliestPendingUploadOffset (150) to determine fetch point
+   * - Only remaining batches (150, 199) need to be fetched locally
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts to upload point:
+   *    - localLogStartOffset = 150
+   *    - logEndOffset = 150
+   *    - fetchOffset = 150
+   * 3. After 2 additional doWork() calls, only remaining batches are fetched:
+   *    - log size = 2 (only batches at 150 and 199)
+   *    - logStartOffset = 10
+   *    - localLogStartOffset = 150
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   */
+  @Test
+  def testLastTieredOffsetStaleLocalLogOffsetFastUpload(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 5, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("e".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("f".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 150
+    )
+    leaderState.logStartOffset = 10
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(150, replicaState.localLogStartOffset)
+    assertEquals(150, replicaState.logEndOffset)
+    assertEquals(Some(150), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    for (_ <- 1 to 2) fetcher.doWork()
+    assertEquals(2, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(150, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Stale Local Log Offset and Full Upload
+   *
+   * Purpose:
+   * - Validate follower fetch behavior when local log offset is stale with 
full upload completed
+   * - Test scenario: Stale local log offset with all data uploaded to remote 
storage
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+   * - Leader LocalLogStartOffset: **5** (stale - before logStartOffset)
+   * - earliestPendingUploadOffset: **200** (full upload completed - all data 
uploaded to remote)
+   * - Leader has 3 batches at offsets 5, 10, 199
+   *
+   * Scenario:
+   * - Leader has stale local log offset (5 < 10)
+   * - All data has been uploaded to remote storage 
(earliestPendingUploadOffset = 200)
+   * - Despite stale local log offset, presence of upload offset indicates 
leader knows about tiered storage
+   * - Follower uses earliestPendingUploadOffset (200) to determine fetch point
+   * - Since all data is uploaded (logEndOffset = earliestPendingUploadOffset 
= 200), no local fetching needed
+   *
+   * Expected Outcomes:
+   * 1. Follower fetch state transitions to FETCHING
+   * 2. First doWork() call adjusts to upload endpoint (200, not 
logStartOffset):
+   *    - localLogStartOffset = 200
+   *    - logEndOffset = 200
+   *    - fetchOffset = 200
+   * 3. Second doWork() call updates logStartOffset:
+   *    - logStartOffset = 10 (leader's logStartOffset)
+   *    - localLogStartOffset = 200
+   *    - logEndOffset = 200
+   *    - highWatermark = 199
+   * 4. No batches are fetched (all data already uploaded):
+   *    - log size = 0
+   */
+  @Test
+  def testLastTieredOffsetStaleLocalLogOffsetFullUpload(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0);
+    val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndPoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 5, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 200
+    )
+    leaderState.logStartOffset = 10
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(200, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(Some(200), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+    fetcher.doWork()
+    assertEquals(0, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(200, replicaState.localLogStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Last Tiered Offset Fetch with Retriable Remote Storage Exception
+   *
+   * Purpose:
+   * - Validate follower error handling when tier state machine throws 
retriable remote storage exception
+   * - Test scenario: Retriable exception during tier state machine start 
should trigger retry, not failure
+   *
+   * Conditions:
+   * - TieredStorage: **Enabled**
+   * - fetchFromLastTieredOffset: **true**
+   * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+   * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no 
deletions)
+   * - earliestPendingUploadOffset: **150** (fast upload - progressed to 
offset 150)
+   * - Leader has 3 batches at offsets 10, 150, 199
+   * - Mock tier state machine throws RetriableRemoteStorageException during 
start()
+   *
+   * Scenario:
+   * - Follower attempts to fetch from last tiered offset
+   * - Tier state machine start() throws RetriableRemoteStorageException 
(simulating temporary remote storage issue)
+   * - Exception is retriable, so follower should enter delayed retry mode, 
NOT mark partition as failed
+   * - Follower will retry the operation after a delay
+   * - This tests proper error handling for transient remote storage failures
+   *
+   * Expected Outcomes:
+   * 1. Partition is NOT marked as failed:
+   *    - assertFalse(failedPartitions.contains(partition))
+   * 2. Follower enters delayed retry mode:
+   *    - fetchOffset = 0 (unchanged - no progress made)
+   *    - lag is empty (not calculated due to error)
+   *    - delay is present (will retry after delay)
+   *    - state = FETCHING (stays in FETCHING state)
+   *    - currentLeaderEpoch = 0
+   *    - lastFetchedEpoch = 0
+   * 3. LogEndOffset remains unchanged:
+   *    - logEndOffset = 0 (no progress until retry succeeds)
+   */
+  @Test
+  def testLastTieredOffsetRetryableRemoteStorageException(): Unit = {
+    val rlmEnabled = true
+    val partition = new TopicPartition("topic", 0)
+    val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) {
+      override def start(topicPartition: TopicPartition,
+                         topicId: Optional[Uuid],
+                         currentLeaderEpoch: Int,
+                         fetchStartOffsetAndEpoch: OffsetAndEpoch,
+                         leaderLogStartOffset: Long): PartitionFetchState = {
+        throw new RetriableRemoteStorageException("Retryable exception")
+      }
+    }
+    val fetcher = new MockFetcherThread(mockLeaderEndpoint, 
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+    val leaderLog = Seq(
+      // LogStartOffset = LocalLogStartOffset = 10
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled,
+      earliestPendingUploadOffset = 150
+    )
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    // Should not be marked as failed
+    assertFalse(failedPartitions.contains(partition))
+
+    val fetchStateOpt = fetcher.fetchState(partition)
+    assertTrue(fetchStateOpt.nonEmpty)
+    assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+    // Fetch offset remains unchanged
+    assertEquals(0, fetchStateOpt.get.fetchOffset)
+    // Lag remains unchanged
+    assertTrue(fetchStateOpt.get.lag.isEmpty)
+    assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+    // Fetch will be retried after some delay
+    assertTrue(fetchStateOpt.get.delay.isPresent)
+    assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+
+    // LogEndOffset is unchanged
+    assertEquals(0, replicaState.logEndOffset)
+  }
 }
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala 
b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
index 18690de30e4..a222928006c 100644
--- a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
+++ b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
@@ -140,7 +140,9 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new 
BrokerEndPoint(1, "l
     checkLeaderEpochAndThrow(leaderEpoch, leaderState)
     leaderState.earliestPendingUploadOffset match {
       case -1L => new OffsetAndEpoch(-1L, -1)
-      case _ => new 
OffsetAndEpoch(math.max(leaderState.earliestPendingUploadOffset, 
leaderState.logStartOffset), leaderState.leaderEpoch)
+      case _ => new OffsetAndEpoch(
+        math.max(leaderState.earliestPendingUploadOffset, 
math.max(leaderState.localLogStartOffset, leaderState.logStartOffset)),
+        leaderState.leaderEpoch)
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala 
b/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala
index ca37d9a3f19..46146c18919 100644
--- a/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala
@@ -17,11 +17,11 @@
 
 package kafka.server
 
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.server.LeaderEndPoint
 import org.apache.kafka.server.PartitionFetchState
 import org.apache.kafka.server.ReplicaState
+import org.apache.kafka.server.common.OffsetAndEpoch
 
 import java.util.Optional
 
@@ -30,15 +30,17 @@ class MockTierStateMachine(leader: LeaderEndPoint) extends 
TierStateMachine(lead
   var fetcher: MockFetcherThread = _
 
   override def start(topicPartition: TopicPartition,
-                     currentFetchState: PartitionFetchState,
-                     fetchPartitionData: FetchResponseData.PartitionData): 
PartitionFetchState = {
-    val leaderEndOffset = leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch).offset
-    val offsetToFetch = leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch).offset
+                     topicId: Optional[Uuid],
+                     currentLeaderEpoch: Int,
+                     fetchStartOffsetAndEpoch: OffsetAndEpoch,
+                     leaderLogStartOffset: Long): PartitionFetchState = {
+    val leaderEndOffset = leader.fetchLatestOffset(topicPartition, 
currentLeaderEpoch).offset
+    val offsetToFetch = fetchStartOffsetAndEpoch.offset
     val initialLag = leaderEndOffset - offsetToFetch
     fetcher.truncateFullyAndStartAt(topicPartition, offsetToFetch)
-    new PartitionFetchState(currentFetchState.topicId, offsetToFetch, 
Optional.of(initialLag),
-      currentFetchState.currentLeaderEpoch, Optional.empty(), 
ReplicaState.FETCHING,
-      Optional.of(currentFetchState.currentLeaderEpoch)
+    new PartitionFetchState(topicId, offsetToFetch, Optional.of(initialLag),
+      currentLeaderEpoch, Optional.empty(), ReplicaState.FETCHING,
+      Optional.of(currentLeaderEpoch)
     )
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala 
b/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala
index b20113d8b78..7f9263e8b27 100644
--- a/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala
@@ -18,16 +18,17 @@
 package kafka.server
 
 import org.apache.kafka.common.errors.FencedLeaderEpochException
-import org.apache.kafka.common.message.FetchResponseData
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.record.internal._
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.server.{PartitionFetchState, ReplicaState}
 import org.junit.jupiter.api.Assertions._
 import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch}
+import org.apache.kafka.server.common.OffsetAndEpoch
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Optional
 import scala.collection.Map
 
 class TierStateMachineTest {
@@ -164,10 +165,12 @@ class TierStateMachineTest {
     val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = 
truncateOnFetch, version = version)
     val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) {
       override def start(topicPartition: TopicPartition,
-                         currentFetchState: PartitionFetchState,
-                         fetchPartitionData: FetchResponseData.PartitionData): 
PartitionFetchState = {
+                         topicId: Optional[Uuid],
+                         currentLeaderEpoch: Int,
+                         fetchStartOffsetAndEpoch: OffsetAndEpoch,
+                         leaderLogStartOffset: Long): PartitionFetchState = {
         isErrorHandled = true
-        throw new FencedLeaderEpochException(s"Epoch 
${currentFetchState.currentLeaderEpoch} is fenced")
+        throw new FencedLeaderEpochException(s"Epoch ${currentLeaderEpoch} is 
fenced")
       }
     }
     val fetcher = new MockFetcherThread(mockLeaderEndpoint, 
mockTierStateMachine, failedPartitions = failedPartitions)

Reply via email to