This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0d9ae4928be KAFKA-17302: APIs for Follower Fetch from Last Tiered 
Offset (#21153)
0d9ae4928be is described below

commit 0d9ae4928becf6ff0b93c52b9453da4dc0e689e1
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Wed Dec 24 22:37:20 2025 +0530

    KAFKA-17302: APIs for Follower Fetch from Last Tiered Offset (#21153)
    
    In this PR, we are adding the methods required for supporting follower
    fetch from Last Tiered Offset.
    
    - Added new method `shouldFetchFromLastTieredOffset` in
    AbstractFetcherThread to determine if the folllower should fetch from
    the last tiered offset for the remote storage enabled topic
    - Added new API `fetchEarliestPendingUploadOffset` to LeaderEndPoint for
    retrieving the offset from where it should start replicating data from
    the leader.
    
    Reviewers: Kamal Chandraprakash <[email protected]>
---
 .../scala/kafka/server/AbstractFetcherThread.scala |  2 +
 .../scala/kafka/server/LocalLeaderEndPoint.scala   | 28 +++++++++
 .../scala/kafka/server/RemoteLeaderEndPoint.scala  |  4 ++
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |  2 +
 .../scala/kafka/server/ReplicaFetcherThread.scala  | 11 ++++
 .../kafka/server/LocalLeaderEndPointTest.scala     | 56 ++++++++++++++++-
 .../kafka/server/AbstractFetcherManagerTest.scala  |  6 +-
 .../unit/kafka/server/MockFetcherThread.scala      |  5 +-
 .../unit/kafka/server/MockLeaderEndPoint.scala     | 16 ++++-
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 71 +++++++++++++++++++++-
 .../org/apache/kafka/server/LeaderEndPoint.java    | 11 ++++
 11 files changed, 201 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 1e8841df0ca..62bc9363872 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -101,6 +101,8 @@ abstract class AbstractFetcherThread(name: String,
 
   protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): 
Optional[OffsetAndEpoch]
 
+  protected def shouldFetchFromLastTieredOffset(topicPartition: 
TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean
+
   override def shutdown(): Unit = {
     initiateShutdown()
     inLock(partitionMapLock) {
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala 
b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index f32d9f8037a..852cd85420d 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -135,6 +135,34 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
     new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0))
   }
 
+  override def fetchEarliestPendingUploadOffset(topicPartition: 
TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
+    val partition = replicaManager.getPartitionOrException(topicPartition)
+    val log = partition.localLogOrException
+
+    if (!log.remoteLogEnabled()) {
+      new OffsetAndEpoch(-1L, -1)
+    } else {
+      val highestRemoteOffset = log.highestOffsetInRemoteStorage()
+      val logStartOffset = fetchEarliestOffset(topicPartition, 
currentLeaderEpoch)
+
+      highestRemoteOffset match {
+        case -1L =>
+          val localLogStartOffset = fetchEarliestLocalOffset(topicPartition, 
currentLeaderEpoch)
+          if (localLogStartOffset.offset() == logStartOffset.offset()) {
+            // No segments have been uploaded yet
+            logStartOffset
+          } else {
+            // Leader currently does not know about the already uploaded 
segments
+            new OffsetAndEpoch(-1L, -1)
+          }
+        case _ =>
+          val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1, 
logStartOffset.offset())
+          val epoch = 
log.leaderEpochCache.epochForOffset(earliestPendingUploadOffset)
+          new OffsetAndEpoch(earliestPendingUploadOffset, epoch.orElse(0))
+      }
+    }
+  }
+
   override def fetchEpochEndOffsets(partitions: util.Map[TopicPartition, 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]): 
util.Map[TopicPartition, EpochEndOffset] = {
     partitions.asScala.map { case (tp, epochData) =>
       try {
diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala 
b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
index 80d41e3b0cf..ba4b1b51739 100644
--- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
@@ -105,6 +105,10 @@ class RemoteLeaderEndPoint(logPrefix: String,
     fetchOffset(topicPartition, currentLeaderEpoch, 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
   }
 
+  override def fetchEarliestPendingUploadOffset(topicPartition: 
TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
+    fetchOffset(topicPartition, currentLeaderEpoch, 
ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+  }
+
   private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: 
Int, timestamp: Long): OffsetAndEpoch = {
     val topic = new ListOffsetsTopic()
       .setName(topicPartition.topic)
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 81bb41100f7..7d2e0288456 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -68,6 +68,8 @@ class ReplicaAlterLogDirsThread(name: String,
     
replicaMgr.futureLocalLogOrException(topicPartition).endOffsetForEpoch(epoch)
   }
 
+  override protected def shouldFetchFromLastTieredOffset(topicPartition: 
TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean = false
+
   // process fetched data
   override def processPartitionData(
     topicPartition: TopicPartition,
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index fa2f6bb7f35..8444704fb10 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -63,6 +63,17 @@ class ReplicaFetcherThread(name: String,
     replicaMgr.localLogOrException(topicPartition).endOffsetForEpoch(epoch)
   }
 
+  override protected[server] def 
shouldFetchFromLastTieredOffset(topicPartition: TopicPartition, 
leaderEndOffset: Long, replicaEndOffset: Long): Boolean = {
+    val isCompactTopic = 
replicaMgr.localLog(topicPartition).exists(_.config.compact)
+    val remoteStorageEnabled = 
replicaMgr.localLog(topicPartition).exists(_.remoteLogEnabled())
+
+    brokerConfig.followerFetchLastTieredOffsetEnable &&
+      remoteStorageEnabled &&
+      !isCompactTopic &&
+      replicaEndOffset == 0 &&
+      leaderEndOffset != 0
+  }
+
   override def initiateShutdown(): Boolean = {
     val justShutdown = super.initiateShutdown()
     if (justShutdown) {
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 1fcfde90fbe..9eeed9b86f5 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -20,6 +20,7 @@ package kafka.server
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.{CoreUtils, Logging, TestUtils}
 import org.apache.kafka.common.compress.Compression
+import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.{TopicIdPartition, Uuid}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@@ -34,13 +35,13 @@ import org.apache.kafka.server.common.{KRaftVersion, 
MetadataVersion, OffsetAndE
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.LeaderEndPoint
 import org.apache.kafka.server.util.{MockScheduler, MockTime}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, 
LogDirFailureChannel}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
LogDirFailureChannel}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.api.Assertions._
 import org.mockito.Mockito.mock
 
 import java.io.File
-import java.util.{Map => JMap}
+import java.util.{Properties, Map => JMap}
 import scala.collection.Map
 import scala.jdk.CollectionConverters._
 
@@ -62,7 +63,16 @@ class LocalLeaderEndPointTest extends Logging {
   def setUp(): Unit = {
     val props = TestUtils.createBrokerConfig(sourceBroker.id, port = 
sourceBroker.port)
     val config = KafkaConfig.fromProps(props)
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new 
File(_)))
+
+    val logProps = new Properties()
+    logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+    // Keep cleanup.policy=delete (default), not compact, so remote storage is 
allowed
+    val defaultLogConfig = LogConfig.fromProps(Map.empty[String, 
Object].asJava, logProps)
+    val mockLogMgr = TestUtils.createLogManager(
+      config.logDirs.asScala.map(new File(_)),
+      defaultConfig = defaultLogConfig,
+      remoteStorageSystemEnable = true
+    )
     val alterPartitionManager = mock(classOf[AlterPartitionManager])
     val metrics = new Metrics
     quotaManager = QuotaFactory.instantiate(config, metrics, time, "", "")
@@ -233,6 +243,46 @@ class LocalLeaderEndPointTest extends Logging {
     assertEquals(expected, result)
   }
 
+  @Test
+  def testEarliestPendingUploadOffsetWhenNoSegmentsUploaded(): Unit = {
+    // Append some records; no remote upload happened yet
+    appendRecords(replicaManager, topicIdPartition, records)
+      .onFire(response => assertEquals(Errors.NONE, response.error))
+
+    val expected = endPoint.fetchEarliestOffset(topicPartition, 0)
+    val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
+    assertEquals(expected, result)
+  }
+
+  @Test
+  def testEarliestPendingUploadOffsetWhenLocalStartGreaterThanStart(): Unit = {
+    appendRecords(replicaManager, topicIdPartition, records)
+      .onFire(response => assertEquals(Errors.NONE, response.error))
+
+    // Bump epoch and advance local log start offset without changing log 
start offset
+    bumpLeaderEpoch()
+    
replicaManager.logManager.getLog(topicPartition).foreach(_.updateLocalLogStartOffset(3))
+
+    val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 1)
+    assertEquals(new OffsetAndEpoch(-1L, -1), result)
+  }
+
+  @Test
+  def testEarliestPendingUploadOffsetWhenHighestRemoteOffsetKnown(): Unit = {
+    appendRecords(replicaManager, topicIdPartition, records)
+      .onFire(response => assertEquals(Errors.NONE, response.error))
+
+    // Highest remote is 1 => earliest pending should be max(1+1, logStart)
+    val log = 
replicaManager.getPartitionOrException(topicPartition).localLogOrException
+    log.updateHighestOffsetInRemoteStorage(1)
+
+    val expectedOffset = Math.max(2L, log.logStartOffset())
+    val epoch = log.leaderEpochCache().epochForOffset(expectedOffset).orElse(0)
+
+    val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
+    assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
+  }
+
   private class CallbackResult[T] {
     private var value: Option[T] = None
     private var fun: Option[T => Unit] = None
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index 5577dc9bd38..97e2d5b7181 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -18,8 +18,8 @@ package kafka.server
 
 import com.yammer.metrics.core.Gauge
 import kafka.utils.TestUtils
-import org.apache.kafka.common.message.{FetchResponseData, 
OffsetForLeaderEpochRequestData}
 import org.apache.kafka.common.message.FetchResponseData.PartitionData
+import org.apache.kafka.common.message.{FetchResponseData, 
OffsetForLeaderEpochRequestData}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.requests.FetchRequest
@@ -318,6 +318,8 @@ class AbstractFetcherManagerTest {
     override val isTruncationOnFetchSupported: Boolean = false
 
     override def fetchEarliestLocalOffset(topicPartition: TopicPartition, 
currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
+
+    override def fetchEarliestPendingUploadOffset(topicPartition: 
TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new 
OffsetAndEpoch(-1L, -1)
   }
 
   private class MockResizeFetcherTierStateMachine extends 
TierStateMachine(null, null, false) {
@@ -354,6 +356,8 @@ class AbstractFetcherManagerTest {
     override protected def logEndOffset(topicPartition: TopicPartition): Long 
= 1
 
     override protected def endOffsetForEpoch(topicPartition: TopicPartition, 
epoch: Int): Optional[OffsetAndEpoch] = Optional.of(new OffsetAndEpoch(1, 0))
+
+    override protected def shouldFetchFromLastTieredOffset(topicPartition: 
TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean = false
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala 
b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
index 5165debe66c..74cd8440fc6 100644
--- a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
+++ b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
@@ -38,7 +38,8 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
                         val replicaId: Int = 0,
                         val leaderId: Int = 1,
                         fetchBackOffMs: Int = 0,
-                        failedPartitions: FailedPartitions = new 
FailedPartitions)
+                        failedPartitions: FailedPartitions = new 
FailedPartitions,
+                        fetchFromLastTieredOffset: Boolean = false)
   extends AbstractFetcherThread("mock-fetcher",
     clientId = "mock-fetcher",
     leader = mockLeader,
@@ -183,4 +184,6 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
       assertEquals(expectedEpoch, 
fetchState(partition).map(_.lastFetchedEpoch.get()))
     }
   }
+
+  override def shouldFetchFromLastTieredOffset(topicPartition: TopicPartition, 
leaderEndOffset: Long, replicaEndOffset: Long): Boolean = 
fetchFromLastTieredOffset
 }
diff --git a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala 
b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
index 96e43955d9e..a6a9862aeff 100644
--- a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
+++ b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala
@@ -135,6 +135,15 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = 
new BrokerEndPoint(1, "l
     new OffsetAndEpoch(leaderState.localLogStartOffset, 
leaderState.leaderEpoch)
   }
 
+  override def fetchEarliestPendingUploadOffset(topicPartition: 
TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
+    val leaderState = leaderPartitionState(topicPartition)
+    checkLeaderEpochAndThrow(leaderEpoch, leaderState)
+    leaderState.earliestPendingUploadOffset match {
+      case -1L => new OffsetAndEpoch(-1L, -1)
+      case _ => new 
OffsetAndEpoch(math.max(leaderState.earliestPendingUploadOffset, 
leaderState.logStartOffset), leaderState.leaderEpoch)
+    }
+  }
+
   override def fetchEpochEndOffsets(partitions: java.util.Map[TopicPartition, 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]): 
java.util.Map[TopicPartition, EpochEndOffset] = {
     val endOffsets = new java.util.HashMap[TopicPartition, EpochEndOffset]()
     partitions.forEach { (partition, epochData) =>
@@ -262,13 +271,14 @@ class PartitionState(var log: mutable.Buffer[RecordBatch],
                      var logEndOffset: Long,
                      var highWatermark: Long,
                      var rlmEnabled: Boolean = false,
-                     var localLogStartOffset: Long)
+                     var localLogStartOffset: Long,
+                     var earliestPendingUploadOffset: Long)
 
 object PartitionState {
-  def apply(log: Seq[RecordBatch], leaderEpoch: Int, highWatermark: Long, 
rlmEnabled: Boolean = false): PartitionState = {
+  def apply(log: Seq[RecordBatch], leaderEpoch: Int, highWatermark: Long, 
rlmEnabled: Boolean = false, earliestPendingUploadOffset: Long = -1L): 
PartitionState = {
     val logStartOffset = log.headOption.map(_.baseOffset).getOrElse(0L)
     val logEndOffset = log.lastOption.map(_.nextOffset).getOrElse(0L)
-    new PartitionState(log.toBuffer, leaderEpoch, logStartOffset, 
logEndOffset, highWatermark, rlmEnabled, logStartOffset)
+    new PartitionState(log.toBuffer, leaderEpoch, logStartOffset, 
logEndOffset, highWatermark, rlmEnabled, logStartOffset, 
earliestPendingUploadOffset)
   }
 
   def apply(leaderEpoch: Int): PartitionState = {
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index f60de95672c..cab7db47f6c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -24,6 +24,7 @@ import kafka.server.epoch.util.MockBlockingSender
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.compress.Compression
+import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.message.FetchResponseData
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
@@ -38,12 +39,13 @@ import org.apache.kafka.server.common.{KRaftVersion, 
MetadataVersion, OffsetAndE
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.ReplicaState
 import org.apache.kafka.server.PartitionFetchState
-import org.apache.kafka.storage.internals.log.{LogAppendInfo, UnifiedLog}
+import org.apache.kafka.server.config.ReplicationConfigs
+import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogConfig, 
UnifiedLog}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 import org.mockito.ArgumentCaptor
 import org.mockito.ArgumentMatchers.{any, anyBoolean, anyLong}
 import org.mockito.Mockito.{mock, times, verify, when}
@@ -51,7 +53,7 @@ import org.mockito.Mockito.{mock, times, verify, when}
 import java.lang.{Long => JLong}
 import java.nio.charset.StandardCharsets
 import java.util
-import java.util.{Collections, Optional}
+import java.util.{Collections, Optional, Properties}
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters._
@@ -818,4 +820,67 @@ class ReplicaFetcherThreadTest {
     when(replicaManager.localLogOrException(t2p1)).thenReturn(log)
     when(replicaManager.getPartitionOrException(t2p1)).thenReturn(partition)
   }
+
+  @ParameterizedTest
+  @CsvSource(Array(
+    "false, false, compact, 0, 0, false",
+    "false, false, compact, 5, 0, false",
+    "false, false, compact, 5, 1, false",
+    "false, false, delete, 0, 0, false",
+    "false, false, delete, 5, 0, false",
+    "false, false, delete, 5, 1, false",
+    "false, true, compact, 0, 0, false",
+    "false, true, compact, 5, 0, false",
+    "false, true, compact, 5, 1, false",
+    "false, true, delete, 0, 0, false",
+    "false, true, delete, 5, 0, false",
+    "false, true, delete, 5, 1, false",
+    "true, false, compact, 0, 0, false",
+    "true, false, compact, 5, 0, false",
+    "true, false, compact, 5, 1, false",
+    "true, false, delete, 0, 0, false",
+    "true, false, delete, 5, 0, false",
+    "true, false, delete, 5, 1, false",
+    "true, true, compact, 0, 0, false",
+    "true, true, compact, 5, 0, false",
+    "true, true, compact, 5, 1, false",
+    "true, true, delete, 0, 0, false",
+    "true, true, delete, 5, 0, true",
+    "true, true, delete, 5, 1, false"))
+  def testShouldFetchFromLastTieredOffset(enableLastTieredOffsetFetch: Boolean,
+                                          remoteStorageEnabled: Boolean,
+                                          cleanUpPolicy: String,
+                                          leaderEndOffset: Long,
+                                          replicaEndOffset: Long,
+                                          expected: Boolean): Unit = {
+    val tp = new TopicPartition("t", 0)
+
+    val props = TestUtils.createBrokerConfig(1)
+    
props.put(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG, 
String.valueOf(enableLastTieredOffsetFetch))
+    val config = KafkaConfig.fromProps(props)
+
+    val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend])
+    when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint)
+
+    val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+    when(replicaManager.brokerTopicStats).thenReturn(new BrokerTopicStats)
+
+    val lcOverrides = new Properties()
+    lcOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanUpPolicy)
+    val logConfig = LogConfig.fromProps(config.extractLogConfigMap, 
lcOverrides)
+
+    val log: UnifiedLog = mock(classOf[UnifiedLog])
+    when(log.config).thenReturn(logConfig)
+    when(log.remoteLogEnabled()).thenReturn(remoteStorageEnabled)
+    when(replicaManager.localLog(tp)).thenReturn(Some(log))
+
+    val logContext = new LogContext(s"[ReplicaFetcher 
replicaId=${config.brokerId}, leaderId=${mockBlockingSend.brokerEndPoint().id}, 
fetcherId=0] ")
+    val fetchSessionHandler = new FetchSessionHandler(logContext, 
mockBlockingSend.brokerEndPoint().id)
+    val leader = new RemoteLeaderEndPoint(logContext.logPrefix, 
mockBlockingSend, fetchSessionHandler,
+      config, replicaManager, UNBOUNDED_QUOTA, () => 
MetadataVersion.MINIMUM_VERSION, () => 1)
+
+    val thread = new ReplicaFetcherThread("test-fetcher", leader, config, 
failedPartitions, replicaManager, UNBOUNDED_QUOTA, logContext.logPrefix)
+
+    assertEquals(expected, thread.shouldFetchFromLastTieredOffset(tp, 
leaderEndOffset, replicaEndOffset))
+  }
 }
diff --git a/server/src/main/java/org/apache/kafka/server/LeaderEndPoint.java 
b/server/src/main/java/org/apache/kafka/server/LeaderEndPoint.java
index 7a0a3a0703b..83d626fa23c 100644
--- a/server/src/main/java/org/apache/kafka/server/LeaderEndPoint.java
+++ b/server/src/main/java/org/apache/kafka/server/LeaderEndPoint.java
@@ -103,6 +103,17 @@ public interface LeaderEndPoint {
      */
     OffsetAndEpoch fetchEarliestLocalOffset(TopicPartition topicPartition, int 
currentLeaderEpoch);
 
+
+    /**
+     * Fetches the earliest offset and epoch that is pending upload for the 
given topic partition from the leader.
+     *
+     * @param topicPartition     The topic partition for which the earliest 
pending upload offset is to be fetched.
+     * @param currentLeaderEpoch The current leader epoch of the requesting 
replica.
+     * @return An OffsetAndEpoch object representing the earliest pending 
upload offset and its associated epoch
+     * in the leader's topic partition.
+     */
+    OffsetAndEpoch fetchEarliestPendingUploadOffset(TopicPartition 
topicPartition, int currentLeaderEpoch);
+
     /**
      * Builds a fetch request, given a partition map.
      *

Reply via email to