This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0d9ae4928be KAFKA-17302: APIs for Follower Fetch from Last Tiered
Offset (#21153)
0d9ae4928be is described below
commit 0d9ae4928becf6ff0b93c52b9453da4dc0e689e1
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Wed Dec 24 22:37:20 2025 +0530
KAFKA-17302: APIs for Follower Fetch from Last Tiered Offset (#21153)
In this PR, we are adding the methods required for supporting follower
fetch from Last Tiered Offset.
- Added new method `shouldFetchFromLastTieredOffset` in
AbstractFetcherThread to determine if the folllower should fetch from
the last tiered offset for the remote storage enabled topic
- Added new API `fetchEarliestPendingUploadOffset` to LeaderEndPoint for
retrieving the offset from where it should start replicating data from
the leader.
Reviewers: Kamal Chandraprakash <[email protected]>
---
.../scala/kafka/server/AbstractFetcherThread.scala | 2 +
.../scala/kafka/server/LocalLeaderEndPoint.scala | 28 +++++++++
.../scala/kafka/server/RemoteLeaderEndPoint.scala | 4 ++
.../kafka/server/ReplicaAlterLogDirsThread.scala | 2 +
.../scala/kafka/server/ReplicaFetcherThread.scala | 11 ++++
.../kafka/server/LocalLeaderEndPointTest.scala | 56 ++++++++++++++++-
.../kafka/server/AbstractFetcherManagerTest.scala | 6 +-
.../unit/kafka/server/MockFetcherThread.scala | 5 +-
.../unit/kafka/server/MockLeaderEndPoint.scala | 16 ++++-
.../kafka/server/ReplicaFetcherThreadTest.scala | 71 +++++++++++++++++++++-
.../org/apache/kafka/server/LeaderEndPoint.java | 11 ++++
11 files changed, 201 insertions(+), 11 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 1e8841df0ca..62bc9363872 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -101,6 +101,8 @@ abstract class AbstractFetcherThread(name: String,
protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int):
Optional[OffsetAndEpoch]
+ protected def shouldFetchFromLastTieredOffset(topicPartition:
TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean
+
override def shutdown(): Unit = {
initiateShutdown()
inLock(partitionMapLock) {
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index f32d9f8037a..852cd85420d 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -135,6 +135,34 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0))
}
+ override def fetchEarliestPendingUploadOffset(topicPartition:
TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
+ val partition = replicaManager.getPartitionOrException(topicPartition)
+ val log = partition.localLogOrException
+
+ if (!log.remoteLogEnabled()) {
+ new OffsetAndEpoch(-1L, -1)
+ } else {
+ val highestRemoteOffset = log.highestOffsetInRemoteStorage()
+ val logStartOffset = fetchEarliestOffset(topicPartition,
currentLeaderEpoch)
+
+ highestRemoteOffset match {
+ case -1L =>
+ val localLogStartOffset = fetchEarliestLocalOffset(topicPartition,
currentLeaderEpoch)
+ if (localLogStartOffset.offset() == logStartOffset.offset()) {
+ // No segments have been uploaded yet
+ logStartOffset
+ } else {
+ // Leader currently does not know about the already uploaded
segments
+ new OffsetAndEpoch(-1L, -1)
+ }
+ case _ =>
+ val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1,
logStartOffset.offset())
+ val epoch =
log.leaderEpochCache.epochForOffset(earliestPendingUploadOffset)
+ new OffsetAndEpoch(earliestPendingUploadOffset, epoch.orElse(0))
+ }
+ }
+ }
+
override def fetchEpochEndOffsets(partitions: util.Map[TopicPartition,
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]):
util.Map[TopicPartition, EpochEndOffset] = {
partitions.asScala.map { case (tp, epochData) =>
try {
diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
index 80d41e3b0cf..ba4b1b51739 100644
--- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
@@ -105,6 +105,10 @@ class RemoteLeaderEndPoint(logPrefix: String,
fetchOffset(topicPartition, currentLeaderEpoch,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
}
+ override def fetchEarliestPendingUploadOffset(topicPartition:
TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
+ fetchOffset(topicPartition, currentLeaderEpoch,
ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+ }
+
private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch:
Int, timestamp: Long): OffsetAndEpoch = {
val topic = new ListOffsetsTopic()
.setName(topicPartition.topic)
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 81bb41100f7..7d2e0288456 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -68,6 +68,8 @@ class ReplicaAlterLogDirsThread(name: String,
replicaMgr.futureLocalLogOrException(topicPartition).endOffsetForEpoch(epoch)
}
+ override protected def shouldFetchFromLastTieredOffset(topicPartition:
TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean = false
+
// process fetched data
override def processPartitionData(
topicPartition: TopicPartition,
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index fa2f6bb7f35..8444704fb10 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -63,6 +63,17 @@ class ReplicaFetcherThread(name: String,
replicaMgr.localLogOrException(topicPartition).endOffsetForEpoch(epoch)
}
+ override protected[server] def
shouldFetchFromLastTieredOffset(topicPartition: TopicPartition,
leaderEndOffset: Long, replicaEndOffset: Long): Boolean = {
+ val isCompactTopic =
replicaMgr.localLog(topicPartition).exists(_.config.compact)
+ val remoteStorageEnabled =
replicaMgr.localLog(topicPartition).exists(_.remoteLogEnabled())
+
+ brokerConfig.followerFetchLastTieredOffsetEnable &&
+ remoteStorageEnabled &&
+ !isCompactTopic &&
+ replicaEndOffset == 0 &&
+ leaderEndOffset != 0
+ }
+
override def initiateShutdown(): Boolean = {
val justShutdown = super.initiateShutdown()
if (justShutdown) {
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 1fcfde90fbe..9eeed9b86f5 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -20,6 +20,7 @@ package kafka.server
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.apache.kafka.common.compress.Compression
+import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{TopicIdPartition, Uuid}
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@@ -34,13 +35,13 @@ import org.apache.kafka.server.common.{KRaftVersion,
MetadataVersion, OffsetAndE
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.LeaderEndPoint
import org.apache.kafka.server.util.{MockScheduler, MockTime}
-import org.apache.kafka.storage.internals.log.{AppendOrigin,
LogDirFailureChannel}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
LogDirFailureChannel}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
import org.mockito.Mockito.mock
import java.io.File
-import java.util.{Map => JMap}
+import java.util.{Properties, Map => JMap}
import scala.collection.Map
import scala.jdk.CollectionConverters._
@@ -62,7 +63,16 @@ class LocalLeaderEndPointTest extends Logging {
def setUp(): Unit = {
val props = TestUtils.createBrokerConfig(sourceBroker.id, port =
sourceBroker.port)
val config = KafkaConfig.fromProps(props)
- val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new
File(_)))
+
+ val logProps = new Properties()
+ logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+ // Keep cleanup.policy=delete (default), not compact, so remote storage is
allowed
+ val defaultLogConfig = LogConfig.fromProps(Map.empty[String,
Object].asJava, logProps)
+ val mockLogMgr = TestUtils.createLogManager(
+ config.logDirs.asScala.map(new File(_)),
+ defaultConfig = defaultLogConfig,
+ remoteStorageSystemEnable = true
+ )
val alterPartitionManager = mock(classOf[AlterPartitionManager])
val metrics = new Metrics
quotaManager = QuotaFactory.instantiate(config, metrics, time, "", "")
@@ -233,6 +243,46 @@ class LocalLeaderEndPointTest extends Logging {
assertEquals(expected, result)
}
+ @Test
+ def testEarliestPendingUploadOffsetWhenNoSegmentsUploaded(): Unit = {
+ // Append some records; no remote upload happened yet
+ appendRecords(replicaManager, topicIdPartition, records)
+ .onFire(response => assertEquals(Errors.NONE, response.error))
+
+ val expected = endPoint.fetchEarliestOffset(topicPartition, 0)
+ val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
+ assertEquals(expected, result)
+ }
+
+ @Test
+ def testEarliestPendingUploadOffsetWhenLocalStartGreaterThanStart(): Unit = {
+ appendRecords(replicaManager, topicIdPartition, records)
+ .onFire(response => assertEquals(Errors.NONE, response.error))
+
+ // Bump epoch and advance local log start offset without changing log
start offset
+ bumpLeaderEpoch()
+
replicaManager.logManager.getLog(topicPartition).foreach(_.updateLocalLogStartOffset(3))
+
+ val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 1)
+ assertEquals(new OffsetAndEpoch(-1L, -1), result)
+ }
+
+ @Test
+ def testEarliestPendingUploadOffsetWhenHighestRemoteOffsetKnown(): Unit = {
+ appendRecords(replicaManager, topicIdPartition, records)
+ .onFire(response => assertEquals(Errors.NONE, response.error))
+
+ // Highest remote is 1 => earliest pending should be max(1+1, logStart)
+ val log =
replicaManager.getPartitionOrException(topicPartition).localLogOrException
+ log.updateHighestOffsetInRemoteStorage(1)
+
+ val expectedOffset = Math.max(2L, log.logStartOffset())
+ val epoch = log.leaderEpochCache().epochForOffset(expectedOffset).orElse(0)
+
+ val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
+ assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
+ }
+
private class CallbackResult[T] {
private var value: Option[T] = None
private var fun: Option[T => Unit] = None
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index 5577dc9bd38..97e2d5b7181 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -18,8 +18,8 @@ package kafka.server
import com.yammer.metrics.core.Gauge
import kafka.utils.TestUtils
-import org.apache.kafka.common.message.{FetchResponseData,
OffsetForLeaderEpochRequestData}
import org.apache.kafka.common.message.FetchResponseData.PartitionData
+import org.apache.kafka.common.message.{FetchResponseData,
OffsetForLeaderEpochRequestData}
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.requests.FetchRequest
@@ -318,6 +318,8 @@ class AbstractFetcherManagerTest {
override val isTruncationOnFetchSupported: Boolean = false
override def fetchEarliestLocalOffset(topicPartition: TopicPartition,
currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
+
+ override def fetchEarliestPendingUploadOffset(topicPartition:
TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new
OffsetAndEpoch(-1L, -1)
}
private class MockResizeFetcherTierStateMachine extends
TierStateMachine(null, null, false) {
@@ -354,6 +356,8 @@ class AbstractFetcherManagerTest {
override protected def logEndOffset(topicPartition: TopicPartition): Long
= 1
override protected def endOffsetForEpoch(topicPartition: TopicPartition,
epoch: Int): Optional[OffsetAndEpoch] = Optional.of(new OffsetAndEpoch(1, 0))
+
+ override protected def shouldFetchFromLastTieredOffset(topicPartition:
TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean = false
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
index 5165debe66c..74cd8440fc6 100644
--- a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
+++ b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
@@ -38,7 +38,8 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
val replicaId: Int = 0,
val leaderId: Int = 1,
fetchBackOffMs: Int = 0,
- failedPartitions: FailedPartitions = new
FailedPartitions)
+ failedPartitions: FailedPartitions = new
FailedPartitions,
+ fetchFromLastTieredOffset: Boolean = false)
extends AbstractFetcherThread("mock-fetcher",
clientId = "mock-fetcher",
leader = mockLeader,
@@ -183,4 +184,6 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
assertEquals(expectedEpoch,
fetchState(partition).map(_.lastFetchedEpoch.get()))
}
}
+
+ override def shouldFetchFromLastTieredOffset(topicPartition: TopicPartition,
leaderEndOffset: Long, replicaEndOffset: Long): Boolean =
fetchFromLastTieredOffset
}
diff --git a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
index 96e43955d9e..a6a9862aeff 100644
--- a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
+++ b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
@@ -135,6 +135,15 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint =
new BrokerEndPoint(1, "l
new OffsetAndEpoch(leaderState.localLogStartOffset,
leaderState.leaderEpoch)
}
+ override def fetchEarliestPendingUploadOffset(topicPartition:
TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
+ val leaderState = leaderPartitionState(topicPartition)
+ checkLeaderEpochAndThrow(leaderEpoch, leaderState)
+ leaderState.earliestPendingUploadOffset match {
+ case -1L => new OffsetAndEpoch(-1L, -1)
+ case _ => new
OffsetAndEpoch(math.max(leaderState.earliestPendingUploadOffset,
leaderState.logStartOffset), leaderState.leaderEpoch)
+ }
+ }
+
override def fetchEpochEndOffsets(partitions: java.util.Map[TopicPartition,
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]):
java.util.Map[TopicPartition, EpochEndOffset] = {
val endOffsets = new java.util.HashMap[TopicPartition, EpochEndOffset]()
partitions.forEach { (partition, epochData) =>
@@ -262,13 +271,14 @@ class PartitionState(var log: mutable.Buffer[RecordBatch],
var logEndOffset: Long,
var highWatermark: Long,
var rlmEnabled: Boolean = false,
- var localLogStartOffset: Long)
+ var localLogStartOffset: Long,
+ var earliestPendingUploadOffset: Long)
object PartitionState {
- def apply(log: Seq[RecordBatch], leaderEpoch: Int, highWatermark: Long,
rlmEnabled: Boolean = false): PartitionState = {
+ def apply(log: Seq[RecordBatch], leaderEpoch: Int, highWatermark: Long,
rlmEnabled: Boolean = false, earliestPendingUploadOffset: Long = -1L):
PartitionState = {
val logStartOffset = log.headOption.map(_.baseOffset).getOrElse(0L)
val logEndOffset = log.lastOption.map(_.nextOffset).getOrElse(0L)
- new PartitionState(log.toBuffer, leaderEpoch, logStartOffset,
logEndOffset, highWatermark, rlmEnabled, logStartOffset)
+ new PartitionState(log.toBuffer, leaderEpoch, logStartOffset,
logEndOffset, highWatermark, rlmEnabled, logStartOffset,
earliestPendingUploadOffset)
}
def apply(leaderEpoch: Int): PartitionState = {
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index f60de95672c..cab7db47f6c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -24,6 +24,7 @@ import kafka.server.epoch.util.MockBlockingSender
import kafka.utils.TestUtils
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.compress.Compression
+import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.message.FetchResponseData
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
@@ -38,12 +39,13 @@ import org.apache.kafka.server.common.{KRaftVersion,
MetadataVersion, OffsetAndE
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.ReplicaState
import org.apache.kafka.server.PartitionFetchState
-import org.apache.kafka.storage.internals.log.{LogAppendInfo, UnifiedLog}
+import org.apache.kafka.server.config.ReplicationConfigs
+import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogConfig,
UnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyLong}
import org.mockito.Mockito.{mock, times, verify, when}
@@ -51,7 +53,7 @@ import org.mockito.Mockito.{mock, times, verify, when}
import java.lang.{Long => JLong}
import java.nio.charset.StandardCharsets
import java.util
-import java.util.{Collections, Optional}
+import java.util.{Collections, Optional, Properties}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
@@ -818,4 +820,67 @@ class ReplicaFetcherThreadTest {
when(replicaManager.localLogOrException(t2p1)).thenReturn(log)
when(replicaManager.getPartitionOrException(t2p1)).thenReturn(partition)
}
+
+ @ParameterizedTest
+ @CsvSource(Array(
+ "false, false, compact, 0, 0, false",
+ "false, false, compact, 5, 0, false",
+ "false, false, compact, 5, 1, false",
+ "false, false, delete, 0, 0, false",
+ "false, false, delete, 5, 0, false",
+ "false, false, delete, 5, 1, false",
+ "false, true, compact, 0, 0, false",
+ "false, true, compact, 5, 0, false",
+ "false, true, compact, 5, 1, false",
+ "false, true, delete, 0, 0, false",
+ "false, true, delete, 5, 0, false",
+ "false, true, delete, 5, 1, false",
+ "true, false, compact, 0, 0, false",
+ "true, false, compact, 5, 0, false",
+ "true, false, compact, 5, 1, false",
+ "true, false, delete, 0, 0, false",
+ "true, false, delete, 5, 0, false",
+ "true, false, delete, 5, 1, false",
+ "true, true, compact, 0, 0, false",
+ "true, true, compact, 5, 0, false",
+ "true, true, compact, 5, 1, false",
+ "true, true, delete, 0, 0, false",
+ "true, true, delete, 5, 0, true",
+ "true, true, delete, 5, 1, false"))
+ def testShouldFetchFromLastTieredOffset(enableLastTieredOffsetFetch: Boolean,
+ remoteStorageEnabled: Boolean,
+ cleanUpPolicy: String,
+ leaderEndOffset: Long,
+ replicaEndOffset: Long,
+ expected: Boolean): Unit = {
+ val tp = new TopicPartition("t", 0)
+
+ val props = TestUtils.createBrokerConfig(1)
+
props.put(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG,
String.valueOf(enableLastTieredOffsetFetch))
+ val config = KafkaConfig.fromProps(props)
+
+ val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend])
+ when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint)
+
+ val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+ when(replicaManager.brokerTopicStats).thenReturn(new BrokerTopicStats)
+
+ val lcOverrides = new Properties()
+ lcOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanUpPolicy)
+ val logConfig = LogConfig.fromProps(config.extractLogConfigMap,
lcOverrides)
+
+ val log: UnifiedLog = mock(classOf[UnifiedLog])
+ when(log.config).thenReturn(logConfig)
+ when(log.remoteLogEnabled()).thenReturn(remoteStorageEnabled)
+ when(replicaManager.localLog(tp)).thenReturn(Some(log))
+
+ val logContext = new LogContext(s"[ReplicaFetcher
replicaId=${config.brokerId}, leaderId=${mockBlockingSend.brokerEndPoint().id},
fetcherId=0] ")
+ val fetchSessionHandler = new FetchSessionHandler(logContext,
mockBlockingSend.brokerEndPoint().id)
+ val leader = new RemoteLeaderEndPoint(logContext.logPrefix,
mockBlockingSend, fetchSessionHandler,
+ config, replicaManager, UNBOUNDED_QUOTA, () =>
MetadataVersion.MINIMUM_VERSION, () => 1)
+
+ val thread = new ReplicaFetcherThread("test-fetcher", leader, config,
failedPartitions, replicaManager, UNBOUNDED_QUOTA, logContext.logPrefix)
+
+ assertEquals(expected, thread.shouldFetchFromLastTieredOffset(tp,
leaderEndOffset, replicaEndOffset))
+ }
}
diff --git a/server/src/main/java/org/apache/kafka/server/LeaderEndPoint.java
b/server/src/main/java/org/apache/kafka/server/LeaderEndPoint.java
index 7a0a3a0703b..83d626fa23c 100644
--- a/server/src/main/java/org/apache/kafka/server/LeaderEndPoint.java
+++ b/server/src/main/java/org/apache/kafka/server/LeaderEndPoint.java
@@ -103,6 +103,17 @@ public interface LeaderEndPoint {
*/
OffsetAndEpoch fetchEarliestLocalOffset(TopicPartition topicPartition, int
currentLeaderEpoch);
+
+ /**
+ * Fetches the earliest offset and epoch that is pending upload for the
given topic partition from the leader.
+ *
+ * @param topicPartition The topic partition for which the earliest
pending upload offset is to be fetched.
+ * @param currentLeaderEpoch The current leader epoch of the requesting
replica.
+ * @return An OffsetAndEpoch object representing the earliest pending
upload offset and its associated epoch
+ * in the leader's topic partition.
+ */
+ OffsetAndEpoch fetchEarliestPendingUploadOffset(TopicPartition
topicPartition, int currentLeaderEpoch);
+
/**
* Builds a fetch request, given a partition map.
*