This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4d0c588539e KAFKA-17302: ReplicaFetcher changes for fetching from
tiered offset (#20428)
4d0c588539e is described below
commit 4d0c588539eb3e84ba6c051837c3f03069fae3f1
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Tue Feb 17 20:14:23 2026 +0530
KAFKA-17302: ReplicaFetcher changes for fetching from tiered offset (#20428)
In this PR, we are adding the ReplicaFetcher changes to support fetching
from the last tiered offset.
- We are modifying the TierStateMachine's start method signature to take
any offset on the leader's local log from which to start replicating
leader's logs. Previously, the start method used to build remote log
auxiliary state until the leader's local start offset. Going forward, it
will build the auxiliary state until the supplied offset.
- When the ReplicaFetcher runs into an OOR or OMTS error, and it should
fetch from last tiered offset, it will determine the offset until which
remote log aux state should be built (last tiered offset) and use it to
start the TierStateMachine.
Test scenarios to cover:
https://docs.google.com/spreadsheets/d/1SPp4_otQP_4VH_F6Ta5rVhLPQhUnRxXnZu3hDWf07B4/edit?gid=1256210467#gid=1256210467
Reviewers: Kamal Chandraprakash <[email protected]>
---
.../main/java/kafka/server/TierStateMachine.java | 32 +-
.../scala/kafka/server/AbstractFetcherThread.scala | 41 +-
.../scala/kafka/server/LocalLeaderEndPoint.scala | 4 +-
.../kafka/server/LocalLeaderEndPointTest.scala | 105 +
.../kafka/server/AbstractFetcherManagerTest.scala | 3 +-
.../kafka/server/AbstractFetcherThreadTest.scala | 2110 ++++++++++++++++++++
.../unit/kafka/server/MockLeaderEndPoint.scala | 4 +-
.../unit/kafka/server/MockTierStateMachine.scala | 20 +-
.../unit/kafka/server/TierStateMachineTest.scala | 11 +-
9 files changed, 2293 insertions(+), 37 deletions(-)
diff --git a/core/src/main/java/kafka/server/TierStateMachine.java
b/core/src/main/java/kafka/server/TierStateMachine.java
index 9d8dcafd203..05b7fc13712 100644
--- a/core/src/main/java/kafka/server/TierStateMachine.java
+++ b/core/src/main/java/kafka/server/TierStateMachine.java
@@ -21,7 +21,7 @@ import kafka.cluster.Partition;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.message.FetchResponseData.PartitionData;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.protocol.Errors;
@@ -85,21 +85,19 @@ public class TierStateMachine {
/**
* Start the tier state machine for the provided topic partition.
*
- * @param topicPartition the topic partition
- * @param currentFetchState the current PartitionFetchState which will
- * be used to derive the return value
- * @param fetchPartitionData the data from the fetch response that
returned the offset moved to tiered storage error
- *
- * @return the new PartitionFetchState after the successful start of the
- * tier state machine
+ * @param topicPartition the topic partition for which the tier
state machine is to be started
+ * @param topicId the optional unique identifier of the
topic
+ * @param currentLeaderEpoch the current leader epoch of the
partition
+ * @param fetchStartOffsetAndEpoch the offset on the leader's local log
from which to start replicating logs
+ * @param leaderLogStartOffset the starting offset in the leader's log
+ * @return the new PartitionFetchState after the successful start of the
tier state machine
+ * @throws Exception if an error occurs during the process, such as issues
with remote storage
*/
PartitionFetchState start(TopicPartition topicPartition,
- PartitionFetchState currentFetchState,
- PartitionData fetchPartitionData) throws
Exception {
- OffsetAndEpoch epochAndLeaderLocalStartOffset =
leader.fetchEarliestLocalOffset(topicPartition,
currentFetchState.currentLeaderEpoch());
- int epoch = epochAndLeaderLocalStartOffset.epoch();
- long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
-
+ Optional<Uuid> topicId,
+ int currentLeaderEpoch,
+ OffsetAndEpoch fetchStartOffsetAndEpoch,
+ long leaderLogStartOffset) throws Exception {
long offsetToFetch;
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
@@ -112,19 +110,19 @@ public class TierStateMachine {
}
try {
- offsetToFetch = buildRemoteLogAuxState(topicPartition,
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch,
fetchPartitionData.logStartOffset(), unifiedLog);
+ offsetToFetch = buildRemoteLogAuxState(topicPartition,
currentLeaderEpoch, fetchStartOffsetAndEpoch.offset(),
fetchStartOffsetAndEpoch.epoch(), leaderLogStartOffset, unifiedLog);
} catch (RemoteStorageException e) {
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
throw e;
}
- OffsetAndEpoch fetchLatestOffsetResult =
leader.fetchLatestOffset(topicPartition,
currentFetchState.currentLeaderEpoch());
+ OffsetAndEpoch fetchLatestOffsetResult =
leader.fetchLatestOffset(topicPartition, currentLeaderEpoch);
long leaderEndOffset = fetchLatestOffsetResult.offset();
long initialLag = leaderEndOffset - offsetToFetch;
- return new PartitionFetchState(currentFetchState.topicId(),
offsetToFetch, Optional.of(initialLag), currentFetchState.currentLeaderEpoch(),
+ return new PartitionFetchState(topicId, offsetToFetch,
Optional.of(initialLag), currentLeaderEpoch,
ReplicaState.FETCHING, unifiedLog.latestEpoch());
}
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 97ae76c4f5c..107e12a695f 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -671,7 +671,13 @@ abstract class AbstractFetcherThread(name: String,
*/
val offsetAndEpoch = leader.fetchLatestOffset(topicPartition,
currentLeaderEpoch)
val leaderEndOffset = offsetAndEpoch.offset
- if (leaderEndOffset < replicaEndOffset) {
+ val fetchFromLastTieredOffset =
shouldFetchFromLastTieredOffset(topicPartition, leaderEndOffset,
replicaEndOffset)
+
+ if (fetchFromLastTieredOffset) {
+ val leaderStartOffsetAndEpoch =
leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
+ val earliestPendingUploadOffsetAndEpoch =
fetchEarliestPendingUploadOffset(topicPartition, currentLeaderEpoch,
leaderStartOffsetAndEpoch)
+ fetchTierStateMachine.start(topicPartition, topicId.toJava,
currentLeaderEpoch, earliestPendingUploadOffsetAndEpoch,
leaderStartOffsetAndEpoch.offset())
+ } else if (leaderEndOffset < replicaEndOffset) {
warn(s"Reset fetch offset for partition $topicPartition from
$replicaEndOffset to current " +
s"leader's latest offset $leaderEndOffset")
truncate(topicPartition, OffsetTruncationState(leaderEndOffset,
truncationCompleted = true))
@@ -781,7 +787,15 @@ abstract class AbstractFetcherThread(name: String,
leaderEpochInRequest:
Optional[Integer],
fetchPartitionData:
PartitionData): Boolean = {
try {
- val newFetchState = fetchTierStateMachine.start(topicPartition,
fetchState, fetchPartitionData)
+ val isLastTieredOffsetFetchEnabled =
shouldFetchFromLastTieredOffset(topicPartition, fetchState)
+ val leaderLogStartOffsetAndEpoch =
leader.fetchEarliestOffset(topicPartition, fetchState.currentLeaderEpoch())
+ val fetchOffsetAndEpoch = if (isLastTieredOffsetFetchEnabled) {
+ fetchEarliestPendingUploadOffset(topicPartition,
fetchState.currentLeaderEpoch(), leaderLogStartOffsetAndEpoch)
+ } else {
+ leader.fetchEarliestLocalOffset(topicPartition,
fetchState.currentLeaderEpoch())
+ }
+ val newFetchState = fetchTierStateMachine.start(topicPartition,
fetchState.topicId(), fetchState.currentLeaderEpoch(),
+ fetchOffsetAndEpoch, leaderLogStartOffsetAndEpoch.offset())
// TODO: use fetchTierStateMachine.maybeAdvanceState when implementing
async tiering logic in KAFKA-13560
@@ -805,6 +819,29 @@ abstract class AbstractFetcherThread(name: String,
}
}
+ /**
+ * Determines the earliest offset for pending uploads, taking into account
+ * both local and remote storage conditions.
+ */
+ private def fetchEarliestPendingUploadOffset(topicPartition: TopicPartition,
currentLeaderEpoch: Int, leaderLogStartOffsetAndEpoch: OffsetAndEpoch):
OffsetAndEpoch = {
+ val earliestPendingUploadOffset =
leader.fetchEarliestPendingUploadOffset(topicPartition, currentLeaderEpoch)
+ if (earliestPendingUploadOffset.offset == -1L) {
+ val leaderLocalStartOffset =
leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch)
+ if (leaderLocalStartOffset.offset ==
leaderLogStartOffsetAndEpoch.offset) {
+ return leaderLocalStartOffset
+ }
+ throw new OffsetNotAvailableException("Segments are uploaded to remote
storage, but the leader does not know the earliest pending upload offset.")
+ }
+ earliestPendingUploadOffset
+ }
+
+ private def shouldFetchFromLastTieredOffset(topicPartition: TopicPartition,
fetchState: PartitionFetchState): Boolean = {
+ val leaderEndOffset = leader.fetchLatestOffset(topicPartition,
fetchState.currentLeaderEpoch())
+ val replicaEndOffset = logEndOffset(topicPartition)
+
+ shouldFetchFromLastTieredOffset(topicPartition, leaderEndOffset.offset(),
replicaEndOffset)
+ }
+
private def delayPartitions(partitions: Iterable[TopicPartition], delay:
Long): Unit = {
partitionMapLock.lockInterruptibly()
try {
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index 852cd85420d..30e5231f8c8 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -144,10 +144,10 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
} else {
val highestRemoteOffset = log.highestOffsetInRemoteStorage()
val logStartOffset = fetchEarliestOffset(topicPartition,
currentLeaderEpoch)
+ val localLogStartOffset = fetchEarliestLocalOffset(topicPartition,
currentLeaderEpoch)
highestRemoteOffset match {
case -1L =>
- val localLogStartOffset = fetchEarliestLocalOffset(topicPartition,
currentLeaderEpoch)
if (localLogStartOffset.offset() == logStartOffset.offset()) {
// No segments have been uploaded yet
logStartOffset
@@ -156,7 +156,7 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
new OffsetAndEpoch(-1L, -1)
}
case _ =>
- val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1,
logStartOffset.offset())
+ val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1,
Math.max(logStartOffset.offset(), localLogStartOffset.offset()))
val epoch =
log.leaderEpochCache.epochForOffset(earliestPendingUploadOffset)
new OffsetAndEpoch(earliestPendingUploadOffset, epoch.orElse(0))
}
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 797d8c933f3..70e04c94b20 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -284,6 +284,111 @@ class LocalLeaderEndPointTest extends Logging {
assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
}
+ @Test
+ def
testEarliestPendingUploadOffsetWhenLocalStartGreaterThanStartWithKnownRemoteOffset():
Unit = {
+ // Append records to create initial log state
+ appendRecords(replicaManager, topicIdPartition, records)
+ .onFire(response => assertEquals(Errors.NONE, response.error))
+
+ val log =
replicaManager.getPartitionOrException(topicPartition).localLogOrException
+
+ // Simulate remote upload up to offset 5
+ log.updateHighestOffsetInRemoteStorage(5)
+
+ // Simulate local log start advancing to offset 8
+ log.updateLocalLogStartOffset(8)
+
+ // Expected: max(highestRemoteOffset + 1, max(logStartOffset,
localLogStartOffset))
+ // = max(5 + 1, max(0, 8)) = max(6, 8) = 8
+ val expectedOffset = 8L
+ val epoch = log.leaderEpochCache().epochForOffset(expectedOffset).orElse(0)
+
+ val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
+ assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
+ }
+
+ @Test
+ def
testEarliestPendingUploadOffsetWhenLogStartGreaterThanLocalStartWithLowRemoteOffset():
Unit = {
+ // Append 12 records (offsets 0-11)
+ for (_ <- 1 to 4) {
+ appendRecords(replicaManager, topicIdPartition, records)
+ .onFire(response => assertEquals(Errors.NONE, response.error))
+ }
+
+ // Delete records to advance logStartOffset to 10
+ replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 10), _
=> ())
+
+ val log =
replicaManager.getPartitionOrException(topicPartition).localLogOrException
+
+ // Set localLogStartOffset to 3 (less than logStartOffset)
+ log.updateLocalLogStartOffset(3)
+
+ // Set highestRemoteOffset to 5 (less than logStartOffset)
+ log.updateHighestOffsetInRemoteStorage(5)
+
+ // Expected: max(5+1, max(10, 3)) = max(6, 10) = 10
+ val expectedOffset = 10L
+ val epoch = log.leaderEpochCache().epochForOffset(expectedOffset).orElse(0)
+
+ val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
+ assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
+ }
+
+ @Test
+ def testEarliestPendingUploadOffsetWhenRemoteOffsetDominatesLogStart(): Unit
= {
+ // Append 18 records (offsets 0-17)
+ for (_ <- 1 to 6) {
+ appendRecords(replicaManager, topicIdPartition, records)
+ .onFire(response => assertEquals(Errors.NONE, response.error))
+ }
+
+ // Delete records to advance logStartOffset to 10
+ replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 10), _
=> ())
+
+ val log =
replicaManager.getPartitionOrException(topicPartition).localLogOrException
+
+ // Set localLogStartOffset to 3 (less than logStartOffset)
+ log.updateLocalLogStartOffset(3)
+
+ // Set highestRemoteOffset to 15 (greater than logStartOffset)
+ log.updateHighestOffsetInRemoteStorage(15)
+
+ // Expected: max(15+1, max(10, 3)) = max(16, 10) = 16
+ val expectedOffset = 16L
+ val epoch = log.leaderEpochCache().epochForOffset(expectedOffset).orElse(0)
+
+ val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
+ assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
+ }
+
+ @Test
+ def
testEarliestPendingUploadOffsetWhenBothStartOffsetsEqualAndDominateRemote():
Unit = {
+ // Append 12 records (offsets 0-11)
+ for (_ <- 1 to 4) {
+ appendRecords(replicaManager, topicIdPartition, records)
+ .onFire(response => assertEquals(Errors.NONE, response.error))
+ }
+
+ // Delete records to advance both logStartOffset and localLogStartOffset
to 10
+ replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 10), _
=> ())
+
+ val log =
replicaManager.getPartitionOrException(topicPartition).localLogOrException
+
+ // Verify both offsets are equal (deleteRecords updates both)
+ assertEquals(10L, log.logStartOffset())
+ assertEquals(10L, log.localLogStartOffset())
+
+ // Set highestRemoteOffset to 5 (less than start offsets)
+ log.updateHighestOffsetInRemoteStorage(5)
+
+ // Expected: max(5+1, max(10, 10)) = max(6, 10) = 10
+ val expectedOffset = 10L
+ val epoch = log.leaderEpochCache().epochForOffset(expectedOffset).orElse(0)
+
+ val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
+ assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
+ }
+
private class CallbackResult[T] {
private var value: Option[T] = None
private var fun: Option[T => Unit] = None
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index 97e2d5b7181..d79c39817e2 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -18,7 +18,6 @@ package kafka.server
import com.yammer.metrics.core.Gauge
import kafka.utils.TestUtils
-import org.apache.kafka.common.message.FetchResponseData.PartitionData
import org.apache.kafka.common.message.{FetchResponseData,
OffsetForLeaderEpochRequestData}
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.metrics.Metrics
@@ -323,7 +322,7 @@ class AbstractFetcherManagerTest {
}
private class MockResizeFetcherTierStateMachine extends
TierStateMachine(null, null, false) {
- override def start(topicPartition: TopicPartition, currentFetchState:
PartitionFetchState, fetchPartitionData: PartitionData): PartitionFetchState = {
+ override def start(topicPartition: TopicPartition, topicId:
Optional[Uuid], currentLeaderEpoch: Int, fetchStartOffsetAndEpoch:
OffsetAndEpoch, leaderLogStartOffset: Long): PartitionFetchState = {
throw new UnsupportedOperationException("Materializing tier state is not
supported in this test.")
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 10d11166109..ea01994d411 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch}
import org.apache.kafka.common.message.{FetchResponseData,
OffsetForLeaderEpochRequestData}
+import
org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
import org.apache.kafka.server.{PartitionFetchState, ReplicaState}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -1740,4 +1741,2113 @@ class AbstractFetcherThreadTest {
assertEquals(151, replicaState.logEndOffset)
assertEquals(151, replicaState.highWatermark)
}
+
+ /**
+ * Test: Last Tiered Offset Fetch with Empty Leader Log
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when fetching from last tiered offset
with an empty leader log
+ * - Test scenario: Leader has no data and no segments uploaded
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader log: **Empty**
+ * - earliestPendingUploadOffset: **-1** (no upload information available)
+ * - Leader LogStartOffset: **0**
+ * - Leader LocalLogStartOffset: **0**
+ *
+ * Scenario:
+ * - The leader log is completely empty with no records
+ * - No segments have been uploaded to remote storage
+ * - The follower attempts to fetch from the last tiered offset
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. All offsets remain at 0:
+ * - logStartOffset = 0
+ * - localLogStartOffset = 0
+ * - logEndOffset = 0
+ * - highWatermark = 0
+ * 3. No data is fetched (log size = 0)
+ */
+ @Test
+ def testLastTieredOffsetWithEmptyLeaderLog(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ // Empty Logs
+ val leaderLog = Seq()
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 0L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = -1
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(0, replicaState.localLogStartOffset)
+ assertEquals(0, replicaState.logEndOffset)
+ assertEquals(0, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with No Segments Uploaded
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when leader has local data but no
segments uploaded to remote storage
+ * - Test scenario: Leader has 3 record batches locally, but upload to
tiered storage hasn't started yet
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader log: **3 batches** at offsets 0, 150, 199
+ * - Leader LogStartOffset: **0**
+ * - Leader LocalLogStartOffset: **0**
+ * - earliestPendingUploadOffset: **-1** (no segments uploaded yet)
+ *
+ * Scenario:
+ * - The leader has local data (3 record batches)
+ * - No segments have been uploaded to remote storage yet
(earliestPendingUploadOffset = -1)
+ * - The follower starts from offset 0 and fetches the local data
+ * - Since there's no gap between logStartOffset and where local log starts,
fetching proceeds normally
+ *
+ * Expected Outcomes:
+ * 1. Follower successfully fetches from offset 0
+ * 2. After 3 doWork() calls, all 3 batches are fetched:
+ * - log size = 3
+ * - logStartOffset = 0
+ * - localLogStartOffset = 0
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ */
+ @Test
+ def testLastTieredOffsetWithNoSegmentsUploaded(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 0
+ mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = -1
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(1, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(0, replicaState.localLogStartOffset)
+ assertEquals(1, replicaState.logEndOffset)
+
+ // Only 1 record batch is returned after a poll, so calling 'n' number of
times to get the desired result
+ for (_ <- 1 to 2) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(0, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Partial Upload on New Leader (Gap
Scenario)
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when new leader doesn't know upload
state and there's a gap
+ * - Test scenario: Gap between logStartOffset (0) and where local log
actually starts (10)
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **0**
+ * - Leader LocalLogStartOffset: **10** (local log starts at offset 10,
creating a gap)
+ * - earliestPendingUploadOffset: **-1** (new leader doesn't know about
previous uploads)
+ * - Leader has 3 batches at offsets 10, 150, 199
+ *
+ * Scenario:
+ * - A new leader takes over that has local log starting at offset 10
+ * - The logStartOffset is 0, but local log doesn't have data before offset
10
+ * - This creates a gap: logStartOffset (0) != where local log starts (10)
+ * - New leader doesn't have upload information (earliestPendingUploadOffset
= -1)
+ * - Follower cannot safely determine where to fetch from
+ *
+ * Expected Outcomes:
+ * 1. Follower does NOT fetch immediately (gap scenario with unknown upload
state)
+ * 2. Fetch state shows delayed retry:
+ * - fetchOffset = 0 (unchanged)
+ * - lag is empty
+ * - delay is present (will retry after delay)
+ * 3. LogEndOffset remains 0
+ */
+ @Test
+ def testLastTieredOffsetWithPartialUploadOnNewLeader(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ // LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = -1
+ )
+ leaderState.logStartOffset = 0
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ val fetchStateOpt = fetcher.fetchState(partition)
+ assertTrue(fetchStateOpt.nonEmpty)
+ assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+ // Fetch offset remains unchanged
+ assertEquals(0, fetchStateOpt.get.fetchOffset)
+ // Lag remains unchanged
+ assertTrue(fetchStateOpt.get.lag.isEmpty)
+ assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+ // Fetch will be retried after some delay
+ assertTrue(fetchStateOpt.get.delay.isPresent)
+ assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+
+ // LogEndOffset is unchanged
+ assertEquals(0, replicaState.logEndOffset)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Slow Partial Upload
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when upload to tiered storage is in
progress but slow
+ * - Test scenario: Upload has just started, with
earliestPendingUploadOffset matching local log start
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **0**
+ * - Leader LocalLogStartOffset: **10** (local log starts at offset 10)
+ * - earliestPendingUploadOffset: **10** (upload just started at offset 10)
+ * - Leader has 3 batches at offsets 10, 150, 199
+ *
+ * Scenario:
+ * - Leader has uploaded segments before offset 10 to remote storage
+ * - Local log starts at offset 10 (earlier segments are in remote storage)
+ * - Upload is in progress with earliestPendingUploadOffset = 10 (slow
upload - no progress yet)
+ * - Follower uses lastTieredOffset logic to determine fetch start point
+ * - Since earliestPendingUploadOffset is provided, follower can safely
fetch from offset 10
+ *
+ * Expected Outcomes:
+ * 1. Follower starts fetching from offset 10 (lastTieredOffset logic)
+ * 2. First doWork() call:
+ * - localLogStartOffset = 10
+ * - logEndOffset = 10
+ * - fetchOffset = 10
+ * 3. After 3 additional doWork() calls, all data from offset 10 onwards is
fetched:
+ * - log size = 3
+ * - logStartOffset = 0
+ * - localLogStartOffset = 10
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ */
+ @Test
+ def testLastTieredOffsetWithSlowPartialUpload(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ // LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 10
+ )
+ leaderState.logStartOffset = 0
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 3) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Fast Partial Upload
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when upload to tiered storage has made
significant progress
+ * - Test scenario: Upload has progressed to offset 150, follower fetches
only remaining data
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **0**
+ * - Leader LocalLogStartOffset: **10** (local log starts at offset 10)
+ * - earliestPendingUploadOffset: **150** (upload has progressed to offset
150 - fast upload)
+ * - Leader has 3 batches at offsets 10, 150, 199
+ *
+ * Scenario:
+ * - Leader has uploaded segments up to offset 150 to remote storage
+ * - Local log starts at offset 10, but segments from 10-149 are uploaded
+ * - earliestPendingUploadOffset = 150 indicates upload has made significant
progress
+ * - Follower uses lastTieredOffset logic to start from offset 150
+ * - Only remaining batches (150, 199) need to be fetched locally
+ *
+ * Expected Outcomes:
+ * 1. Follower starts fetching from offset 150 (lastTieredOffset logic)
+ * 2. First doWork() call:
+ * - localLogStartOffset = 150
+ * - logEndOffset = 150
+ * - fetchOffset = 150
+ * 3. After 2 additional doWork() calls, only remaining batches are fetched:
+ * - log size = 2 (only batches at 150 and 199)
+ * - logStartOffset = 0
+ * - localLogStartOffset = 150
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ */
+ @Test
+ def testLastTieredOffsetWithFastPartialUpload(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ // LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 150
+ )
+ leaderState.logStartOffset = 0
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(150, replicaState.localLogStartOffset)
+ assertEquals(150, replicaState.logEndOffset)
+ assertEquals(Some(150), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 2) fetcher.doWork()
+ assertEquals(2, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(150, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Full Upload Completed
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when all segments have been uploaded
to remote storage
+ * - Test scenario: Leader has completed uploading all data, no local data
remains to be fetched
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **0**
+ * - Leader LocalLogStartOffset: **10** (local log starts at offset 10)
+ * - earliestPendingUploadOffset: **200** (all segments uploaded - matches
logEndOffset)
+ * - Leader has 3 batches at offsets 10, 150, 199
+ *
+ * Scenario:
+ * - Leader has uploaded all segments to remote storage
+ * - earliestPendingUploadOffset = 200 indicates upload has completed up to
logEndOffset
+ * - Local log starts at offset 10, but all data from 10-199 is already
uploaded
+ * - Follower uses lastTieredOffset logic to determine it should start from
offset 200
+ * - Since logEndOffset = earliestPendingUploadOffset = 200, there's no
local data to fetch
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. Follower starts at offset 200 (end of uploaded data):
+ * - localLogStartOffset = 200
+ * - logEndOffset = 200
+ * - fetchOffset = 200
+ * 3. No batches are fetched (all data already uploaded):
+ * - log size = 0
+ * - logStartOffset = 0
+ * - highWatermark = 200
+ */
+ @Test
+ def testLastTieredOffsetWithFullUploadCompleted(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ // LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 200
+ )
+ leaderState.logStartOffset = 0
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(200, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(200, replicaState.highWatermark)
+ assertEquals(Some(200), fetcher.fetchState(partition).map(_.fetchOffset()))
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Inactive Leader and Full Upload on
New Leader
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when new leader has empty log with all
segments uploaded
+ * - Test scenario: New leader doesn't know upload state, creating a gap
scenario
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader log: **Empty** (inactive leader - no local data)
+ * - Leader LogStartOffset: **0**
+ * - Leader LocalLogStartOffset: **200** (all local segments
uploaded/deleted)
+ * - Leader LogEndOffset: **200**
+ * - earliestPendingUploadOffset: **-1** (new leader doesn't know about
previous uploads)
+ *
+ * Scenario:
+ * - New leader takes over with empty local log
+ * - All data (0-199) has been uploaded to remote storage and deleted locally
+ * - logStartOffset = 0, but localLogStartOffset = 200 (creating a gap)
+ * - New leader doesn't have upload information (earliestPendingUploadOffset
= -1)
+ * - Follower cannot safely determine where to fetch from due to gap and
unknown upload state
+ *
+ * Expected Outcomes:
+ * 1. Follower does NOT fetch immediately (gap scenario with unknown upload
state)
+ * 2. Fetch state shows delayed retry:
+ * - fetchOffset = 0 (unchanged)
+ * - lag is empty
+ * - delay is present (will retry after delay)
+ * - state = FETCHING
+ * - currentLeaderEpoch = 0
+ * - lastFetchedEpoch = 0
+ * 3. LogEndOffset remains 0 (no progress until upload state known)
+ */
+ @Test
+ def testLastTieredOffsetWithInactiveLeaderFullUploadOnNewLeader(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ // Empty logs
+ val leaderLog = Seq()
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = -1
+ )
+ leaderState.logStartOffset = 0
+ leaderState.localLogStartOffset = 200
+ leaderState.logEndOffset = 200
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ val fetchStateOpt = fetcher.fetchState(partition)
+ assertTrue(fetchStateOpt.nonEmpty)
+ assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+ // Fetch offset remains unchanged
+ assertEquals(0, fetchStateOpt.get.fetchOffset)
+ // Lag remains unchanged
+ assertTrue(fetchStateOpt.get.lag.isEmpty)
+ assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+ // Fetch will be retried after some delay
+ assertTrue(fetchStateOpt.get.delay.isPresent)
+ assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+
+ // LogEndOffset is unchanged
+ assertEquals(0, replicaState.logEndOffset)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Inactive Leader and Full Upload
Completed
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when leader has empty log with known
full upload
+ * - Test scenario: Leader has empty log but knows all data is uploaded
(earliestPendingUploadOffset = 200)
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader log: **Empty** (inactive leader - no local data)
+ * - Leader LogStartOffset: **0**
+ * - Leader LocalLogStartOffset: **200** (all local segments
uploaded/deleted)
+ * - Leader LogEndOffset: **200**
+ * - earliestPendingUploadOffset: **200** (full upload completed - leader
knows upload state)
+ *
+ * Scenario:
+ * - Leader has empty local log (all data uploaded and deleted locally)
+ * - All data (0-199) has been uploaded to remote storage
+ * - earliestPendingUploadOffset = 200 indicates leader knows about the
completed upload
+ * - Follower uses lastTieredOffset logic to start from offset 200
+ * - Since all data is uploaded, no local fetching is needed
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. Follower adjusts to upload endpoint:
+ * - localLogStartOffset = 200
+ * - logEndOffset = 200
+ * - fetchOffset = 200
+ * - highWatermark = 200
+ * 3. No batches are fetched (all data already uploaded):
+ * - log size = 0
+ * - logStartOffset = 0
+ */
+ @Test
+ def testLastTieredOffsetWithInactiveLeaderFullUpload(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ // Empty logs
+ val leaderLog = Seq()
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 200
+ )
+ leaderState.logStartOffset = 0
+ leaderState.localLogStartOffset = 200
+ leaderState.logEndOffset = 200
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(200, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(200, replicaState.highWatermark)
+ assertEquals(Some(200), fetcher.fetchState(partition).map(_.fetchOffset()))
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Empty Leader Log, Non-Zero LSO on New
Leader
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when leader has empty log with
non-zero logStartOffset
+ * - Test scenario: Leader has no data with non-zero start offset, new
leader scenario
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader log: **Empty** (no data)
+ * - Leader LogStartOffset: **10** (non-zero - segments 0-9 previously
deleted)
+ * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no gap)
+ * - Leader LogEndOffset: **10**
+ * - earliestPendingUploadOffset: **-1** (new leader doesn't know upload
state)
+ *
+ * Scenario:
+ * - Leader has empty log with logStartOffset = 10 (segments 0-9 were
previously deleted/uploaded)
+ * - No gap exists (logStartOffset = localLogStartOffset = 10)
+ * - New leader doesn't have upload information (earliestPendingUploadOffset
= -1)
+ * - Since there's no gap, follower can safely fetch from logStartOffset
+ * - Leader log is empty (logEndOffset = logStartOffset = 10), so no data to
fetch
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts to logStartOffset:
+ * - localLogStartOffset = 10
+ * - logEndOffset = 10
+ * - highWatermark = 10
+ * - fetchOffset = 10
+ * 3. Second doWork() call confirms stable state:
+ * - logStartOffset = 10
+ * - localLogStartOffset = 10
+ * - logEndOffset = 10
+ * - highWatermark = 10
+ * 4. No batches are fetched (log is empty):
+ * - log size = 0
+ */
+ @Test
+ def testLastTieredOffsetWithEmptyLeaderLogNonZeroLSOOnNewLeader(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ // Empty logs
+ val leaderLog = Seq()
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 10L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = -1
+ )
+ leaderState.logStartOffset = 10
+ leaderState.localLogStartOffset = 10
+ leaderState.logEndOffset = 10
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(10, replicaState.highWatermark)
+ assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ fetcher.doWork()
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(10, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Empty Leader Log and Stale Upload
Offset
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when leader has empty log with stale
upload offset
+ * - Test scenario: Stale upload offset is ignored in favor of logStartOffset
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader log: **Empty** (no data)
+ * - Leader LogStartOffset: **10** (non-zero - segments 0-9 previously
deleted)
+ * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no gap)
+ * - Leader LogEndOffset: **10**
+ * - earliestPendingUploadOffset: **5** (stale - before logStartOffset of 10)
+ *
+ * Scenario:
+ * - Leader has empty log with logStartOffset = 10 (segments 0-9 previously
deleted/uploaded)
+ * - earliestPendingUploadOffset = 5 is stale (before logStartOffset of 10)
+ * - Despite stale upload offset, presence of upload offset indicates leader
knows about tiered storage
+ * - Follower ignores stale value and uses logStartOffset (10) to determine
fetch point
+ * - Since logEndOffset = logStartOffset = 10, log is empty and no data to
fetch
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts to logStartOffset:
+ * - localLogStartOffset = 10
+ * - logEndOffset = 10
+ * - highWatermark = 10
+ * - fetchOffset = 10
+ * 3. Second doWork() call confirms stable state:
+ * - logStartOffset = 10
+ * - localLogStartOffset = 10
+ * - logEndOffset = 10
+ * - highWatermark = 10
+ * 4. No batches are fetched (log is empty):
+ * - log size = 0
+ */
+ @Test
+ def testLastTieredOffsetWithEmptyLeaderLogStaleUploadOffset(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ // Empty logs
+ val leaderLog = Seq()
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 10L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 5
+ )
+ leaderState.logStartOffset = 10
+ leaderState.localLogStartOffset = 10
+ leaderState.logEndOffset = 10
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(10, replicaState.highWatermark)
+ assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ fetcher.doWork()
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(10, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Non-Zero LSO on New Leader
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when leader has non-zero
logStartOffset without upload info
+ * - Test scenario: New leader with data but no upload information
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+ * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no
deletions)
+ * - earliestPendingUploadOffset: **-1** (new leader doesn't know upload
state)
+ * - Leader has 3 batches at offsets 10, 150, 199
+ *
+ * Scenario:
+ * - Leader log starts at offset 10 with local data available
+ * - No gap exists (logStartOffset = localLogStartOffset = 10)
+ * - New leader doesn't have upload information (earliestPendingUploadOffset
= -1)
+ * - Since there's no gap, follower can safely fetch from logStartOffset
+ * - Follower calculates lag correctly as difference between logEndOffset
and fetchOffset
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. Follower starts at logStartOffset:
+ * - fetchOffset = 10
+ * - lag is calculated correctly (190 = 200 - 10)
+ * - state = FETCHING
+ * - currentLeaderEpoch = 0
+ * - lastFetchedEpoch = 0
+ * 3. LogEndOffset updates to logStartOffset:
+ * - logEndOffset = 10
+ */
+ @Test
+ def testLastTieredOffsetWithNonZeroLSOOnNewLeader(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = -1
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ val fetchStateOpt = fetcher.fetchState(partition)
+ assertTrue(fetchStateOpt.nonEmpty)
+ assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+ assertEquals(10, fetchStateOpt.get.fetchOffset)
+ assertFalse(fetchStateOpt.get.lag.isEmpty)
+ assertEquals(190, fetchStateOpt.get.lag().get())
+ assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+ assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+ assertEquals(10, replicaState.logEndOffset)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Non-Zero LSO and Stale Upload Offset
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when leader has stale upload offset
with non-zero LSO
+ * - Test scenario: Stale upload offset is ignored, follower fetches from
logStartOffset
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+ * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no
deletions)
+ * - earliestPendingUploadOffset: **5** (stale - before logStartOffset of 10)
+ * - Leader has 3 batches at offsets 10, 150, 199
+ *
+ * Scenario:
+ * - Leader log starts at offset 10 with local data available
+ * - earliestPendingUploadOffset = 5 is stale (before logStartOffset of 10)
+ * - Despite stale upload offset, presence of upload offset indicates leader
knows about tiered storage
+ * - Follower ignores stale value and uses logStartOffset (10) to determine
fetch point
+ * - Since logStartOffset = localLogStartOffset = 10 (no gap), follower can
fetch from offset 10
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts to logStartOffset:
+ * - localLogStartOffset = 10
+ * - logEndOffset = 10
+ * - highWatermark = 10
+ * - fetchOffset = 10
+ * 3. After 3 additional doWork() calls, all 3 batches are fetched:
+ * - log size = 3
+ * - logStartOffset = 10
+ * - localLogStartOffset = 10
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ */
+ @Test
+ def testLastTieredOffsetWithNonZeroLSOStaleUploadOffset(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 5
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(10, replicaState.highWatermark)
+ assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ for (_ <- 1 to 3) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Slow Upload and No Local Deletion
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when upload is in progress (slow) with
no local deletions
+ * - Test scenario: Upload has just started, follower fetches from upload
point
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+ * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no
deletions)
+ * - earliestPendingUploadOffset: **10** (slow upload - just started at
offset 10)
+ * - Leader has 3 batches at offsets 10, 150, 199
+ *
+ * Scenario:
+ * - Leader log starts at offset 10 with local data available
+ * - Upload has just started with earliestPendingUploadOffset = 10 (slow
upload - no progress yet)
+ * - Follower uses lastTieredOffset logic to determine fetch point
+ * - Since earliestPendingUploadOffset is provided, follower can safely
fetch from offset 10
+ * - No segments have been uploaded yet (upload just starting)
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts to upload point:
+ * - localLogStartOffset = 10
+ * - logEndOffset = 10
+ * - fetchOffset = 10
+ * 3. After 3 additional doWork() calls, all 3 batches are fetched:
+ * - log size = 3
+ * - logStartOffset = 10
+ * - localLogStartOffset = 10
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ */
+ @Test
+ def testLastTieredOffsetWithSlowUploadNoLocalDeletion(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 10
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 3) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Fast Upload and No Local Deletion
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when upload has made significant
progress (fast)
+ * - Test scenario: Upload has progressed significantly, follower fetches
only remaining data
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+ * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no
deletions)
+ * - earliestPendingUploadOffset: **150** (fast upload - significant
progress to offset 150)
+ * - Leader has 3 batches at offsets 10, 150, 199
+ *
+ * Scenario:
+ * - Leader log starts at offset 10 with local data available
+ * - Upload has progressed to offset 150 (fast upload - batch at offset 10
already uploaded)
+ * - Follower uses lastTieredOffset logic to start from
earliestPendingUploadOffset
+ * - Only remaining batches (150, 199) need to be fetched locally
+ * - Batch at offset 10 is already uploaded to remote storage
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts to upload point:
+ * - localLogStartOffset = 150
+ * - logEndOffset = 150
+ * - fetchOffset = 150
+ * 3. After 2 additional doWork() calls, only remaining batches are fetched:
+ * - log size = 2 (only batches at 150 and 199)
+ * - logStartOffset = 10
+ * - localLogStartOffset = 150
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ */
+ @Test
+ def testLastTieredOffsetWithFastUploadNoLocalDeletion(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 150
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(150, replicaState.localLogStartOffset)
+ assertEquals(150, replicaState.logEndOffset)
+ assertEquals(Some(150), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 2) fetcher.doWork()
+ assertEquals(2, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(150, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Full Upload and No Local Deletions
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when all segments are uploaded and no
local deletions occurred
+ * - Test scenario: Leader log starts at non-zero offset with full upload
completed
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+ * - Leader LocalLogStartOffset: **10** (no local deletions - same as
logStartOffset)
+ * - earliestPendingUploadOffset: **200** (full upload completed - all data
uploaded)
+ * - Leader has 3 batches at offsets 10, 150, 199
+ *
+ * Scenario:
+ * - Leader log starts at offset 10 (non-zero)
+ * - No local deletions have occurred (logStartOffset = localLogStartOffset
= 10)
+ * - All segments have been uploaded to remote storage
(earliestPendingUploadOffset = 200)
+ * - Follower uses lastTieredOffset logic to start from offset 200
+ * - Since all data is uploaded, no local fetching is needed
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts offsets to upload endpoint:
+ * - localLogStartOffset = 200
+ * - logEndOffset = 200
+ * - fetchOffset = 200
+ * 3. Second doWork() call updates logStartOffset:
+ * - logStartOffset = 10 (leader's logStartOffset)
+ * - localLogStartOffset = 200
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ * 4. No batches are fetched (all data already uploaded):
+ * - log size = 0
+ */
+ @Test
+ def testLastTieredOffsetWithFullUploadNoLocalDeletion(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 200
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(200, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(Some(200), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ fetcher.doWork()
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(200, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Partial Upload on New Leader
(Non-Zero LSO, Gap Scenario)
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when new leader doesn't know upload
state with non-zero log start
+ * - Test scenario: Gap between logStartOffset (10) and where local log
actually starts (100)
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10**
+ * - Leader LocalLogStartOffset: **100** (local log starts at offset 100,
creating a gap)
+ * - earliestPendingUploadOffset: **-1** (new leader doesn't know about
previous uploads)
+ * - Leader has 3 batches at offsets 100, 150, 199
+ *
+ * Scenario:
+ * - A new leader takes over that has local log starting at offset 100
+ * - The logStartOffset is 10, but local log doesn't have data before offset
100
+ * - This creates a gap: logStartOffset (10) != where local log starts (100)
+ * - New leader doesn't have upload information (earliestPendingUploadOffset
= -1)
+ * - Follower cannot safely determine where to fetch from
+ *
+ * Expected Outcomes:
+ * 1. Follower does NOT fetch immediately (gap scenario with unknown upload
state)
+ * 2. Fetch state shows delayed retry:
+ * - fetchOffset = 0 (unchanged)
+ * - lag is empty
+ * - delay is present (will retry after delay)
+ * 3. LogEndOffset remains 0
+ */
+ @Test
+ def testLastTieredOffsetWithPartialUploadOnNewLeaderNonZeroLSO(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 100, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = -1
+ )
+ leaderState.logStartOffset = 10
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ val fetchStateOpt = fetcher.fetchState(partition)
+ assertTrue(fetchStateOpt.nonEmpty)
+ assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+ assertEquals(0, fetchStateOpt.get.fetchOffset)
+ assertTrue(fetchStateOpt.get.lag.isEmpty)
+ assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+ assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+ // Fetch will be retried after some delay
+ assertTrue(fetchStateOpt.get.delay.isPresent)
+ assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+
+ // LogEndOffset is unchanged
+ assertEquals(0, replicaState.logEndOffset)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Partial Upload and Stale Upload Offset
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when earliestPendingUploadOffset is
stale (before logStartOffset)
+ * - Test scenario: Upload offset is stale but local log starts at offset
100, follower can still fetch
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10**
+ * - Leader LocalLogStartOffset: **100** (local log starts at offset 100)
+ * - earliestPendingUploadOffset: **5** (stale - before logStartOffset of 10)
+ * - Leader has 3 batches at offsets 100, 150, 199
+ *
+ * Scenario:
+ * - Leader has earliestPendingUploadOffset = 5, which is stale (before
logStartOffset of 10)
+ * - Despite stale upload offset, local log is available starting at offset
100
+ * - Follower ignores the stale upload offset and uses localLogStartOffset
(100) to start fetching
+ * - The presence of a (even stale) upload offset indicates the leader knows
about tiered storage
+ *
+ * Expected Outcomes:
+ * 1. Follower successfully fetches from offset 100 (ignores stale upload
offset)
+ * 2. First doWork() call:
+ * - localLogStartOffset = 100
+ * - logEndOffset = 100
+ * - fetchOffset = 100
+ * 3. After 3 additional doWork() calls, all 3 batches from offset 100
onwards are fetched:
+ * - log size = 3
+ * - logStartOffset = 10
+ * - localLogStartOffset = 100
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ */
+ @Test
+ def testLastTieredOffsetWithPartialUploadAndStaleUploadOffset(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 100, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 5
+ )
+ leaderState.logStartOffset = 10
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(100, replicaState.localLogStartOffset)
+ assertEquals(100, replicaState.logEndOffset)
+ assertEquals(100, replicaState.highWatermark)
+ assertEquals(Some(100), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ for (_ <- 1 to 3) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(100, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Slow Upload and Non-Zero LSO
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when upload is slow with non-zero
logStartOffset and local deletions
+ * - Test scenario: Upload just started at local log start point
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+ * - Leader LocalLogStartOffset: **100** (local deletions - local log starts
at offset 100)
+ * - earliestPendingUploadOffset: **100** (slow upload - just started at
offset 100)
+ * - Leader has 3 batches at offsets 100, 150, 199
+ *
+ * Scenario:
+ * - Leader log starts at offset 10 (segments 0-9 previously deleted)
+ * - Local log starts at offset 100 due to local deletions (segments 10-99
deleted locally)
+ * - Upload has just started with earliestPendingUploadOffset = 100 (slow
upload - no progress yet)
+ * - Follower uses lastTieredOffset logic to start from
earliestPendingUploadOffset
+ * - All local data (100-199) needs to be fetched
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts to upload point:
+ * - localLogStartOffset = 100
+ * - logEndOffset = 100
+ * - fetchOffset = 100
+ * 3. After 3 additional doWork() calls, all 3 batches are fetched:
+ * - log size = 3
+ * - logStartOffset = 10
+ * - localLogStartOffset = 100
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ */
+ @Test
+ def testLastTieredOffsetWithSlowUploadNonZeroLSO(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 100, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 100
+ )
+ leaderState.logStartOffset = 10
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(100, replicaState.localLogStartOffset)
+ assertEquals(100, replicaState.logEndOffset)
+ assertEquals(Some(100), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 3) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(100, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Fast Upload and Non-Zero LSO
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when upload has made significant
progress with non-zero LSO
+ * - Test scenario: Fast upload progress with non-zero logStartOffset
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+ * - Leader LocalLogStartOffset: **100** (local deletions - local log starts
at offset 100)
+ * - earliestPendingUploadOffset: **150** (fast upload - progressed to
offset 150)
+ * - Leader has 3 batches at offsets 100, 150, 199
+ *
+ * Scenario:
+ * - Leader log starts at offset 10 (segments 0-9 previously deleted)
+ * - Local log starts at offset 100 due to local deletions (segments 10-99
deleted locally)
+ * - Upload has progressed to offset 150 (fast upload - batch at offset 100
already uploaded)
+ * - Follower uses lastTieredOffset logic to start from
earliestPendingUploadOffset
+ * - Only remaining batches (150, 199) need to be fetched locally
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts to upload point:
+ * - localLogStartOffset = 150
+ * - logEndOffset = 150
+ * - fetchOffset = 150
+ * 3. After 2 additional doWork() calls, only remaining batches are fetched:
+ * - log size = 2 (only batches at 150 and 199)
+ * - logStartOffset = 10
+ * - localLogStartOffset = 150
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ */
+ @Test
+ def testLastTieredOffsetWithFastUploadNonZeroLSO(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 100, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 150
+ )
+ leaderState.logStartOffset = 10
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(150, replicaState.localLogStartOffset)
+ assertEquals(150, replicaState.logEndOffset)
+ assertEquals(Some(150), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 2) fetcher.doWork()
+ assertEquals(2, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(150, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Full Upload and Non-Zero Local Start
Offset
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when full upload is completed with
local deletions
+ * - Test scenario: Leader has non-zero logStartOffset with local log
starting at higher offset
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+ * - Leader LocalLogStartOffset: **100** (local deletions - local log starts
at offset 100)
+ * - earliestPendingUploadOffset: **200** (full upload completed - all data
uploaded)
+ * - Leader has 3 batches at offsets 100, 150, 199
+ *
+ * Scenario:
+ * - Leader log starts at offset 10 (non-zero)
+ * - Local log starts at offset 100 due to local deletions (segments 10-99
deleted locally but in remote)
+ * - All segments have been uploaded to remote storage
(earliestPendingUploadOffset = 200)
+ * - Follower uses lastTieredOffset logic to start from offset 200
+ * - Since all data is uploaded (even the locally deleted segments 10-99),
no local fetching needed
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts offsets to upload endpoint:
+ * - localLogStartOffset = 200
+ * - logEndOffset = 200
+ * - fetchOffset = 200
+ * 3. Second doWork() call updates logStartOffset:
+ * - logStartOffset = 10 (leader's logStartOffset)
+ * - localLogStartOffset = 200
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ * 4. No batches are fetched (all data already uploaded):
+ * - log size = 0
+ */
+ @Test
+ def testLastTieredOffsetWithFullUploadNonZeroLSO(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 100, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 200
+ )
+ leaderState.logStartOffset = 10
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(200, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(Some(200), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ fetcher.doWork()
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(200, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Inactive Leader, Full Upload,
Non-Zero LSO on New Leader
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when new leader has empty log with
non-zero logStartOffset
+ * - Test scenario: New leader doesn't know upload state with gap scenario
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader log: **Empty** (inactive leader - no local data)
+ * - Leader LogStartOffset: **10** (non-zero - segments 0-9 previously
deleted)
+ * - Leader LocalLogStartOffset: **200** (all local segments
uploaded/deleted)
+ * - Leader LogEndOffset: **200**
+ * - earliestPendingUploadOffset: **-1** (new leader doesn't know about
previous uploads)
+ *
+ * Scenario:
+ * - New leader takes over with empty local log
+ * - logStartOffset = 10 (segments 0-9 were previously deleted)
+ * - All remaining data (10-199) has been uploaded to remote storage and
deleted locally
+ * - localLogStartOffset = 200 (creating a gap: 10 != 200)
+ * - New leader doesn't have upload information (earliestPendingUploadOffset
= -1)
+ * - Follower cannot safely determine where to fetch from due to gap and
unknown upload state
+ *
+ * Expected Outcomes:
+ * 1. Follower does NOT fetch immediately (gap scenario with unknown upload
state)
+ * 2. Fetch state shows delayed retry:
+ * - fetchOffset = 0 (unchanged)
+ * - lag is empty
+ * - delay is present (will retry after delay)
+ * - state = FETCHING
+ * - currentLeaderEpoch = 0
+ * - lastFetchedEpoch = 0
+ * 3. LogEndOffset remains 0 (no progress until upload state known)
+ */
+ @Test
+ def testLastTieredOffsetWithInactiveLeaderFullUploadNonZeroLSOOnNewLeader():
Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ // Empty logs
+ val leaderLog = Seq()
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = -1
+ )
+ leaderState.logStartOffset = 10
+ leaderState.localLogStartOffset = 200
+ leaderState.logEndOffset = 200
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ val fetchStateOpt = fetcher.fetchState(partition)
+ assertTrue(fetchStateOpt.nonEmpty)
+ assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+ assertEquals(0, fetchStateOpt.get.fetchOffset)
+ assertTrue(fetchStateOpt.get.lag.isEmpty)
+ assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+ // Fetch will be retried after some delay
+ assertTrue(fetchStateOpt.get.delay.isPresent)
+ assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+
+ // LogEndOffset is unchanged
+ assertEquals(0, replicaState.logEndOffset)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Inactive Leader, Stale Upload Offset,
Non-Zero LSO
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when leader has empty log with stale
upload offset
+ * - Test scenario: Leader has stale upload offset but follower can still
determine fetch point
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader log: **Empty** (inactive leader - no local data)
+ * - Leader LogStartOffset: **10** (non-zero - segments 0-9 previously
deleted)
+ * - Leader LocalLogStartOffset: **200** (all local segments
uploaded/deleted)
+ * - Leader LogEndOffset: **200**
+ * - earliestPendingUploadOffset: **5** (stale - before logStartOffset of 10)
+ *
+ * Scenario:
+ * - Leader has empty local log (all data uploaded and deleted locally)
+ * - earliestPendingUploadOffset = 5 is stale (before logStartOffset of 10)
+ * - Despite stale upload offset, presence of upload offset indicates leader
knows about tiered storage
+ * - Follower ignores stale value and uses localLogStartOffset (200) to
determine fetch point
+ * - Since localLogStartOffset = logEndOffset = 200, all data is uploaded
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts to localLogStartOffset:
+ * - localLogStartOffset = 200
+ * - logEndOffset = 200
+ * - highWatermark = 200
+ * 3. Second doWork() call updates logStartOffset:
+ * - logStartOffset = 10 (leader's logStartOffset)
+ * - localLogStartOffset = 200
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ * 4. No batches are fetched (all data already uploaded):
+ * - log size = 0
+ */
+ @Test
+ def testLastTieredOffsetWithInactiveLeaderStaleUploadNonZeroLSO(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ // Empty logs
+ val leaderLog = Seq()
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 5
+ )
+ leaderState.logStartOffset = 10
+ leaderState.localLogStartOffset = 200
+ leaderState.logEndOffset = 200
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(200, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(200, replicaState.highWatermark)
+
+ fetcher.doWork()
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(200, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Inactive Leader, Full Upload,
Non-Zero LSO
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when leader has empty log with known
full upload and non-zero LSO
+ * - Test scenario: Leader has empty log but knows all data is uploaded with
non-zero logStartOffset
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader log: **Empty** (inactive leader - no local data)
+ * - Leader LogStartOffset: **10** (non-zero - segments 0-9 previously
deleted)
+ * - Leader LocalLogStartOffset: **200** (all local segments
uploaded/deleted)
+ * - Leader LogEndOffset: **200**
+ * - earliestPendingUploadOffset: **200** (full upload completed - leader
knows upload state)
+ *
+ * Scenario:
+ * - Leader has empty local log (all data uploaded and deleted locally)
+ * - logStartOffset = 10 (segments 0-9 were previously deleted)
+ * - All remaining data (10-199) has been uploaded to remote storage
+ * - earliestPendingUploadOffset = 200 indicates leader knows about the
completed upload
+ * - Follower uses lastTieredOffset logic to start from offset 200
+ * - Since all data is uploaded, no local fetching is needed
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts to upload endpoint:
+ * - localLogStartOffset = 200
+ * - logEndOffset = 200
+ * - fetchOffset = 200
+ * 3. Second doWork() call updates logStartOffset:
+ * - logStartOffset = 10 (leader's logStartOffset)
+ * - localLogStartOffset = 200
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ * 4. No batches are fetched (all data already uploaded):
+ * - log size = 0
+ */
+ @Test
+ def testLastTieredOffsetWithInactiveLeaderFullUploadNonZeroLSO(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ // Empty logs
+ val leaderLog = Seq()
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 200
+ )
+ leaderState.logStartOffset = 10
+ leaderState.localLogStartOffset = 200
+ leaderState.logEndOffset = 200
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(200, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(Some(200), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ fetcher.doWork()
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(200, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Stale Local Log Offset on New Leader
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when local log offset is stale (before
logStartOffset)
+ * - Test scenario: New leader with stale local log offset creates gap
scenario
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+ * - Leader LocalLogStartOffset: **5** (stale - before logStartOffset,
creating inconsistency)
+ * - earliestPendingUploadOffset: **-1** (new leader doesn't know upload
state)
+ * - Leader has 3 batches at offsets 5, 10, 199
+ *
+ * Scenario:
+ * - Leader has local log starting at offset 5 (before logStartOffset of 10)
+ * - This creates a stale/inconsistent state where localLogStartOffset <
logStartOffset
+ * - New leader doesn't have upload information (earliestPendingUploadOffset
= -1)
+ * - Follower cannot safely determine where to fetch from due to gap and
unknown upload state
+ * - This is an unusual scenario where local log offset hasn't been updated
properly
+ *
+ * Expected Outcomes:
+ * 1. Follower does NOT fetch immediately (gap scenario with unknown upload
state)
+ * 2. Fetch state shows delayed retry:
+ * - fetchOffset = 0 (unchanged)
+ * - lag is empty
+ * - delay is present (will retry after delay)
+ * - state = FETCHING
+ * - currentLeaderEpoch = 0
+ * - lastFetchedEpoch = 0
+ * 3. LogEndOffset remains 0 (no progress until upload state known)
+ */
+ @Test
+ def testLastTieredOffsetStaleLocalLogOffsetNewLeader(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 5, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = -1
+ )
+ leaderState.logStartOffset = 10
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ val fetchStateOpt = fetcher.fetchState(partition)
+ assertTrue(fetchStateOpt.nonEmpty)
+ assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+ assertEquals(0, fetchStateOpt.get.fetchOffset)
+ assertTrue(fetchStateOpt.get.lag.isEmpty)
+ assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+ assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+ // Fetch will be retried after some delay
+ assertTrue(fetchStateOpt.get.delay.isPresent)
+ assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+
+ // LogEndOffset is unchanged
+ assertEquals(0, replicaState.logEndOffset)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Stale Local Log Offset and Slow Upload
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when local log offset is stale but
upload info is available
+ * - Test scenario: Stale local log offset with known upload state allows
fetching
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+ * - Leader LocalLogStartOffset: **5** (stale - before logStartOffset)
+ * - earliestPendingUploadOffset: **10** (slow upload - just started at
offset 10)
+ * - Leader has 3 batches at offsets 5, 10, 199
+ *
+ * Scenario:
+ * - Leader has stale local log offset (5 < 10)
+ * - Upload has just started with earliestPendingUploadOffset = 10 (slow
upload)
+ * - Despite stale local log offset, presence of upload offset indicates
leader knows about tiered storage
+ * - Follower uses earliestPendingUploadOffset (10) to determine fetch point
+ * - Follower can safely fetch from offset 10 onwards
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts to upload point:
+ * - localLogStartOffset = 10
+ * - logEndOffset = 10
+ * - fetchOffset = 10
+ * 3. After 2 additional doWork() calls, 2 batches are fetched:
+ * - log size = 2 (batches at 10 and 199)
+ * - logStartOffset = 10
+ * - localLogStartOffset = 10
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ */
+ @Test
+ def testLastTieredOffsetStaleLocalLogOffsetSlowUpload():Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 5, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 10
+ )
+ leaderState.logStartOffset = 10
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ for (_ <- 1 to 2) fetcher.doWork()
+ assertEquals(2, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Stale Local Log Offset and Fast Upload
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when local log offset is stale with
significant upload progress
+ * - Test scenario: Stale local log offset with fast upload allows fetching
from upload point
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+ * - Leader LocalLogStartOffset: **5** (stale - before logStartOffset)
+ * - earliestPendingUploadOffset: **150** (fast upload - progressed to
offset 150)
+ * - Leader has 4 batches at offsets 5, 10, 150, 199
+ *
+ * Scenario:
+ * - Leader has stale local log offset (5 < 10)
+ * - Upload has progressed to offset 150 (fast upload - batches at 5 and 10
already uploaded)
+ * - Despite stale local log offset, presence of upload offset indicates
leader knows about tiered storage
+ * - Follower uses earliestPendingUploadOffset (150) to determine fetch point
+ * - Only remaining batches (150, 199) need to be fetched locally
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts to upload point:
+ * - localLogStartOffset = 150
+ * - logEndOffset = 150
+ * - fetchOffset = 150
+ * 3. After 2 additional doWork() calls, only remaining batches are fetched:
+ * - log size = 2 (only batches at 150 and 199)
+ * - logStartOffset = 10
+ * - localLogStartOffset = 150
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ */
+ @Test
+ def testLastTieredOffsetStaleLocalLogOffsetFastUpload(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 5, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("e".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("f".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 150
+ )
+ leaderState.logStartOffset = 10
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(150, replicaState.localLogStartOffset)
+ assertEquals(150, replicaState.logEndOffset)
+ assertEquals(Some(150), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ for (_ <- 1 to 2) fetcher.doWork()
+ assertEquals(2, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(150, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Stale Local Log Offset and Full Upload
+ *
+ * Purpose:
+ * - Validate follower fetch behavior when local log offset is stale with
full upload completed
+ * - Test scenario: Stale local log offset with all data uploaded to remote
storage
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+ * - Leader LocalLogStartOffset: **5** (stale - before logStartOffset)
+ * - earliestPendingUploadOffset: **200** (full upload completed - all data
uploaded to remote)
+ * - Leader has 3 batches at offsets 5, 10, 199
+ *
+ * Scenario:
+ * - Leader has stale local log offset (5 < 10)
+ * - All data has been uploaded to remote storage
(earliestPendingUploadOffset = 200)
+ * - Despite stale local log offset, presence of upload offset indicates
leader knows about tiered storage
+ * - Follower uses earliestPendingUploadOffset (200) to determine fetch point
+ * - Since all data is uploaded (logEndOffset = earliestPendingUploadOffset
= 200), no local fetching needed
+ *
+ * Expected Outcomes:
+ * 1. Follower fetch state transitions to FETCHING
+ * 2. First doWork() call adjusts to upload endpoint (200, not
logStartOffset):
+ * - localLogStartOffset = 200
+ * - logEndOffset = 200
+ * - fetchOffset = 200
+ * 3. Second doWork() call updates logStartOffset:
+ * - logStartOffset = 10 (leader's logStartOffset)
+ * - localLogStartOffset = 200
+ * - logEndOffset = 200
+ * - highWatermark = 199
+ * 4. No batches are fetched (all data already uploaded):
+ * - log size = 0
+ */
+ @Test
+ def testLastTieredOffsetStaleLocalLogOffsetFullUpload(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0);
+ val mockLeaderEndPoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndPoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndPoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 5, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 200
+ )
+ leaderState.logStartOffset = 10
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(200, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(Some(200), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ fetcher.doWork()
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(200, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Last Tiered Offset Fetch with Retriable Remote Storage Exception
+ *
+ * Purpose:
+ * - Validate follower error handling when tier state machine throws
retriable remote storage exception
+ * - Test scenario: Retriable exception during tier state machine start
should trigger retry, not failure
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - fetchFromLastTieredOffset: **true**
+ * - Leader LogStartOffset: **10** (leader log starts at offset 10)
+ * - Leader LocalLogStartOffset: **10** (same as logStartOffset - no
deletions)
+ * - earliestPendingUploadOffset: **150** (fast upload - progressed to
offset 150)
+ * - Leader has 3 batches at offsets 10, 150, 199
+ * - Mock tier state machine throws RetriableRemoteStorageException during
start()
+ *
+ * Scenario:
+ * - Follower attempts to fetch from last tiered offset
+ * - Tier state machine start() throws RetriableRemoteStorageException
(simulating temporary remote storage issue)
+ * - Exception is retriable, so follower should enter delayed retry mode,
NOT mark partition as failed
+ * - Follower will retry the operation after a delay
+ * - This tests proper error handling for transient remote storage failures
+ *
+ * Expected Outcomes:
+ * 1. Partition is NOT marked as failed:
+ * - assertFalse(failedPartitions.contains(partition))
+ * 2. Follower enters delayed retry mode:
+ * - fetchOffset = 0 (unchanged - no progress made)
+ * - lag is empty (not calculated due to error)
+ * - delay is present (will retry after delay)
+ * - state = FETCHING (stays in FETCHING state)
+ * - currentLeaderEpoch = 0
+ * - lastFetchedEpoch = 0
+ * 3. LogEndOffset remains unchanged:
+ * - logEndOffset = 0 (no progress until retry succeeds)
+ */
+ @Test
+ def testLastTieredOffsetRetryableRemoteStorageException(): Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) {
+ override def start(topicPartition: TopicPartition,
+ topicId: Optional[Uuid],
+ currentLeaderEpoch: Int,
+ fetchStartOffsetAndEpoch: OffsetAndEpoch,
+ leaderLogStartOffset: Long): PartitionFetchState = {
+ throw new RetriableRemoteStorageException("Retryable exception")
+ }
+ }
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine, fetchFromLastTieredOffset = true)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ earliestPendingUploadOffset = 150
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ // Should not be marked as failed
+ assertFalse(failedPartitions.contains(partition))
+
+ val fetchStateOpt = fetcher.fetchState(partition)
+ assertTrue(fetchStateOpt.nonEmpty)
+ assertEquals(ReplicaState.FETCHING, fetchStateOpt.get.state)
+ // Fetch offset remains unchanged
+ assertEquals(0, fetchStateOpt.get.fetchOffset)
+ // Lag remains unchanged
+ assertTrue(fetchStateOpt.get.lag.isEmpty)
+ assertEquals(0, fetchStateOpt.get.currentLeaderEpoch)
+ // Fetch will be retried after some delay
+ assertTrue(fetchStateOpt.get.delay.isPresent)
+ assertEquals(Optional.of(0), fetchStateOpt.get.lastFetchedEpoch)
+
+ // LogEndOffset is unchanged
+ assertEquals(0, replicaState.logEndOffset)
+ }
}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
index 18690de30e4..a222928006c 100644
--- a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
+++ b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
@@ -140,7 +140,9 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new
BrokerEndPoint(1, "l
checkLeaderEpochAndThrow(leaderEpoch, leaderState)
leaderState.earliestPendingUploadOffset match {
case -1L => new OffsetAndEpoch(-1L, -1)
- case _ => new
OffsetAndEpoch(math.max(leaderState.earliestPendingUploadOffset,
leaderState.logStartOffset), leaderState.leaderEpoch)
+ case _ => new OffsetAndEpoch(
+ math.max(leaderState.earliestPendingUploadOffset,
math.max(leaderState.localLogStartOffset, leaderState.logStartOffset)),
+ leaderState.leaderEpoch)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala
b/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala
index ca37d9a3f19..46146c18919 100644
--- a/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala
@@ -17,11 +17,11 @@
package kafka.server
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.server.LeaderEndPoint
import org.apache.kafka.server.PartitionFetchState
import org.apache.kafka.server.ReplicaState
+import org.apache.kafka.server.common.OffsetAndEpoch
import java.util.Optional
@@ -30,15 +30,17 @@ class MockTierStateMachine(leader: LeaderEndPoint) extends
TierStateMachine(lead
var fetcher: MockFetcherThread = _
override def start(topicPartition: TopicPartition,
- currentFetchState: PartitionFetchState,
- fetchPartitionData: FetchResponseData.PartitionData):
PartitionFetchState = {
- val leaderEndOffset = leader.fetchLatestOffset(topicPartition,
currentFetchState.currentLeaderEpoch).offset
- val offsetToFetch = leader.fetchEarliestLocalOffset(topicPartition,
currentFetchState.currentLeaderEpoch).offset
+ topicId: Optional[Uuid],
+ currentLeaderEpoch: Int,
+ fetchStartOffsetAndEpoch: OffsetAndEpoch,
+ leaderLogStartOffset: Long): PartitionFetchState = {
+ val leaderEndOffset = leader.fetchLatestOffset(topicPartition,
currentLeaderEpoch).offset
+ val offsetToFetch = fetchStartOffsetAndEpoch.offset
val initialLag = leaderEndOffset - offsetToFetch
fetcher.truncateFullyAndStartAt(topicPartition, offsetToFetch)
- new PartitionFetchState(currentFetchState.topicId, offsetToFetch,
Optional.of(initialLag),
- currentFetchState.currentLeaderEpoch, Optional.empty(),
ReplicaState.FETCHING,
- Optional.of(currentFetchState.currentLeaderEpoch)
+ new PartitionFetchState(topicId, offsetToFetch, Optional.of(initialLag),
+ currentLeaderEpoch, Optional.empty(), ReplicaState.FETCHING,
+ Optional.of(currentLeaderEpoch)
)
}
diff --git a/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala
b/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala
index b20113d8b78..7f9263e8b27 100644
--- a/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala
@@ -18,16 +18,17 @@
package kafka.server
import org.apache.kafka.common.errors.FencedLeaderEpochException
-import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.internal._
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.server.{PartitionFetchState, ReplicaState}
import org.junit.jupiter.api.Assertions._
import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch}
+import org.apache.kafka.server.common.OffsetAndEpoch
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+import java.util.Optional
import scala.collection.Map
class TierStateMachineTest {
@@ -164,10 +165,12 @@ class TierStateMachineTest {
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch =
truncateOnFetch, version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) {
override def start(topicPartition: TopicPartition,
- currentFetchState: PartitionFetchState,
- fetchPartitionData: FetchResponseData.PartitionData):
PartitionFetchState = {
+ topicId: Optional[Uuid],
+ currentLeaderEpoch: Int,
+ fetchStartOffsetAndEpoch: OffsetAndEpoch,
+ leaderLogStartOffset: Long): PartitionFetchState = {
isErrorHandled = true
- throw new FencedLeaderEpochException(s"Epoch
${currentFetchState.currentLeaderEpoch} is fenced")
+ throw new FencedLeaderEpochException(s"Epoch ${currentLeaderEpoch} is
fenced")
}
}
val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine, failedPartitions = failedPartitions)