This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 76a1c839cf1 KAFKA-19340 Move DelayedRemoteFetch to the storage module 
(#19876)
76a1c839cf1 is described below

commit 76a1c839cf1f6e4ac2bc3317a73e65955428a1f6
Author: Lan Ding <[email protected]>
AuthorDate: Sat Oct 25 00:01:24 2025 +0800

    KAFKA-19340 Move DelayedRemoteFetch to the storage module (#19876)
    
    Move DelayedRemoteFetch to the storage module and rewrite it to java.
    
    Reviewers: Mickael Maison <[email protected]>, Kamal
     Chandraprakash <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 checkstyle/import-control-storage.xml              |   5 +-
 .../java/kafka/server/share/DelayedShareFetch.java |   5 +-
 .../kafka/server/share/PendingRemoteFetches.java   |   2 +-
 .../src/main/scala/kafka/server/DelayedFetch.scala |  12 +-
 .../scala/kafka/server/DelayedRemoteFetch.scala    | 146 ------
 .../main/scala/kafka/server/ReplicaManager.scala   |  56 ++-
 .../kafka/server/share/DelayedShareFetchTest.java  |  10 +-
 .../server/share/SharePartitionManagerTest.java    |   4 +-
 .../kafka/server/DelayedFetchTest.scala            |  29 +-
 .../kafka/server/DelayedRemoteFetchTest.scala      | 501 ---------------------
 .../AbstractCoordinatorConcurrencyTest.scala       |   2 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala    |   6 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  10 +-
 .../kafka/server/purgatory/DelayedRemoteFetch.java | 192 ++++++++
 .../internals/log/FetchPartitionStatus.java        |  29 ++
 .../storage/internals/log}/LogReadResult.java      |  68 +--
 .../server/purgatory/DelayedRemoteFetchTest.java   | 480 ++++++++++++++++++++
 17 files changed, 781 insertions(+), 776 deletions(-)

diff --git a/checkstyle/import-control-storage.xml 
b/checkstyle/import-control-storage.xml
index 2a0f7412685..e4194b20e2c 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -84,6 +84,10 @@
                 </subpackage>
             </subpackage>
         </subpackage>
+
+        <subpackage name="purgatory">
+            <allow pkg="org.apache.kafka.server.storage.log" />
+        </subpackage>
     </subpackage>
 
     <subpackage name="storage.internals">
@@ -164,7 +168,6 @@
         <allow pkg="org.apache.kafka.server.log.remote.storage" />
         <allow pkg="scala.jdk.javaapi" />
         <allow pkg="org.apache.kafka.test" />
-
     </subpackage>
     
 </import-control>
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 969029a6ea5..7b061a28bd5 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.raft.errors.NotLeaderException;
-import org.apache.kafka.server.LogReadResult;
 import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 import org.apache.kafka.server.purgatory.DelayedOperation;
 import org.apache.kafka.server.share.SharePartitionKey;
@@ -46,6 +45,7 @@ import org.apache.kafka.server.util.timer.TimerTask;
 import org.apache.kafka.storage.internals.log.FetchDataInfo;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
 import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
+import org.apache.kafka.storage.internals.log.LogReadResult;
 import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
 import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
 
@@ -852,13 +852,12 @@ public class DelayedShareFetch extends DelayedOperation {
                 if (remoteFetch.remoteFetchResult().isDone()) {
                     RemoteLogReadResult remoteLogReadResult = 
remoteFetch.remoteFetchResult().get();
                     if (remoteLogReadResult.error().isPresent()) {
-                        Throwable error = remoteLogReadResult.error().get();
                         // If there is any error for the remote fetch topic 
partition, we populate the error accordingly.
                         shareFetchPartitionData.add(
                             new ShareFetchPartitionData(
                                 remoteFetch.topicIdPartition(),
                                 
partitionsAcquired.get(remoteFetch.topicIdPartition()),
-                                
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
+                                new 
LogReadResult(Errors.forException(remoteLogReadResult.error().get())).toFetchPartitionData(false)
                             )
                         );
                     } else {
diff --git a/core/src/main/java/kafka/server/share/PendingRemoteFetches.java 
b/core/src/main/java/kafka/server/share/PendingRemoteFetches.java
index 575a32ef466..c25cc706775 100644
--- a/core/src/main/java/kafka/server/share/PendingRemoteFetches.java
+++ b/core/src/main/java/kafka/server/share/PendingRemoteFetches.java
@@ -17,8 +17,8 @@
 package kafka.server.share;
 
 import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.server.LogReadResult;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.LogReadResult;
 import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
 import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
 
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 91480bb420e..ebdc0000440 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -24,25 +24,15 @@ import java.util.concurrent.TimeUnit
 import org.apache.kafka.common.TopicIdPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.purgatory.DelayedOperation
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
-import org.apache.kafka.storage.internals.log.LogOffsetMetadata
+import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, 
LogOffsetMetadata}
 
 import scala.collection._
 import scala.jdk.CollectionConverters._
 
-case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, 
fetchInfo: PartitionData) {
-
-  override def toString: String = {
-    "[startOffsetMetadata: " + startOffsetMetadata +
-      ", fetchInfo: " + fetchInfo +
-      "]"
-  }
-}
-
 /**
  * A delayed fetch operation that can be created by the replica manager and 
watched
  * in the fetch operation purgatory
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala 
b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
deleted file mode 100644
index fc2926988c0..00000000000
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import com.yammer.metrics.core.Meter
-import kafka.utils.Logging
-import org.apache.kafka.common.TopicIdPartition
-import org.apache.kafka.common.errors._
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.server.LogReadResult
-import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.server.purgatory.DelayedOperation
-import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
-import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, 
RemoteLogReadResult, RemoteStorageFetchInfo}
-
-import java.util
-import java.util.concurrent.{CompletableFuture, Future, TimeUnit}
-import java.util.{Optional, OptionalInt, OptionalLong}
-import scala.collection._
-
-/**
- * A remote fetch operation that can be created by the replica manager and 
watched
- * in the remote fetch operation purgatory
- */
-class DelayedRemoteFetch(remoteFetchTasks: util.Map[TopicIdPartition, 
Future[Void]],
-                         remoteFetchResults: util.Map[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]],
-                         remoteFetchInfos: util.Map[TopicIdPartition, 
RemoteStorageFetchInfo],
-                         remoteFetchMaxWaitMs: Long,
-                         fetchPartitionStatus: Seq[(TopicIdPartition, 
FetchPartitionStatus)],
-                         fetchParams: FetchParams,
-                         localReadResults: Seq[(TopicIdPartition, 
LogReadResult)],
-                         replicaManager: ReplicaManager,
-                         responseCallback: Seq[(TopicIdPartition, 
FetchPartitionData)] => Unit)
-  extends DelayedOperation(remoteFetchMaxWaitMs) with Logging {
-
-  if (fetchParams.isFromFollower) {
-    throw new IllegalStateException(s"The follower should not invoke remote 
fetch. Fetch params are: $fetchParams")
-  }
-
-  /**
-   * The operation can be completed if:
-   *
-   * Case a: This broker is no longer the leader of the partition it tries to 
fetch
-   * Case b: This broker does not know the partition it tries to fetch
-   * Case c: All the remote storage read request completed (succeeded or 
failed)
-   * Case d: The partition is in an offline log directory on this broker
-   *
-   * Upon completion, should return whatever data is available for each valid 
partition
-   */
-  override def tryComplete(): Boolean = {
-    fetchPartitionStatus.foreach {
-      case (topicPartition, fetchStatus) =>
-        val fetchOffset = fetchStatus.startOffsetMetadata
-        try {
-          if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
-            
replicaManager.getPartitionOrException(topicPartition.topicPartition())
-          }
-        } catch {
-          case _: KafkaStorageException => // Case d
-            debug(s"Partition $topicPartition is in an offline log directory, 
satisfy $fetchParams immediately")
-            return forceComplete()
-          case _: UnknownTopicOrPartitionException => // Case b
-            debug(s"Broker no longer knows of partition $topicPartition, 
satisfy $fetchParams immediately")
-            return forceComplete()
-          case _: NotLeaderOrFollowerException =>  // Case a
-            debug("Broker is no longer the leader or follower of %s, satisfy 
%s immediately".format(topicPartition, fetchParams))
-            return forceComplete()
-        }
-    }
-    // Case c
-    if (remoteFetchResults.values().stream().allMatch(taskResult => 
taskResult.isDone))
-      forceComplete()
-    else
-      false
-  }
-
-  override def onExpiration(): Unit = {
-    // cancel the remote storage read task, if it has not been executed yet and
-    // avoid interrupting the task if it is already running as it may force 
closing opened/cached resources as transaction index.
-    remoteFetchTasks.forEach { (topicIdPartition, task) =>
-      if (task != null && !task.isDone) {
-         if (!task.cancel(false)) {
-           debug(s"Remote fetch task for remoteFetchInfo: 
${remoteFetchInfos.get(topicIdPartition)} could not be cancelled.")
-         }
-      }
-    }
-
-    DelayedRemoteFetchMetrics.expiredRequestMeter.mark()
-  }
-
-  /**
-   * Upon completion, read whatever data is available and pass to the complete 
callback
-   */
-  override def onComplete(): Unit = {
-    val fetchPartitionData = localReadResults.map { case (tp, result) =>
-      val remoteFetchResult = remoteFetchResults.get(tp)
-      if (remoteFetchResults.containsKey(tp)
-        && remoteFetchResult.isDone
-        && result.error == Errors.NONE
-        && result.info.delayedRemoteStorageFetch.isPresent) {
-        if (remoteFetchResult.get.error.isPresent) {
-          tp -> 
ReplicaManager.createLogReadResult(remoteFetchResult.get.error.get).toFetchPartitionData(false)
-        } else {
-          val info = remoteFetchResult.get.fetchDataInfo.get
-          tp -> new FetchPartitionData(
-            result.error,
-            result.highWatermark,
-            result.leaderLogStartOffset,
-            info.records,
-            Optional.empty(),
-            if (result.lastStableOffset.isPresent) 
OptionalLong.of(result.lastStableOffset.getAsLong) else OptionalLong.empty(),
-            info.abortedTransactions,
-            if (result.preferredReadReplica.isPresent) 
OptionalInt.of(result.preferredReadReplica.getAsInt) else OptionalInt.empty(),
-            false)
-        }
-      } else {
-        tp -> result.toFetchPartitionData(false)
-      }
-    }
-
-    responseCallback(fetchPartitionData)
-  }
-}
-
-object DelayedRemoteFetchMetrics {
-  // Changing the package or class name may cause incompatibility with 
existing code and metrics configuration
-  private val metricsPackage = "kafka.server"
-  private val metricsClassName = "DelayedRemoteFetchMetrics"
-  private val metricsGroup = new KafkaMetricsGroup(metricsPackage, 
metricsClassName)
-  val expiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", 
"requests", TimeUnit.SECONDS)
-}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index a4c5416cb34..1d9d076a317 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -56,16 +56,16 @@ import org.apache.kafka.server.config.ReplicationConfigs
 import org.apache.kafka.server.log.remote.storage.RemoteLogManager
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteListOffsets, 
DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, 
TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets, 
DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, 
TopicPartitionOperationKey}
 import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, 
DelayedShareFetchPartitionKey}
 import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
 import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
 import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
 import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
 import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
-import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, 
LogReadResult, common}
+import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
 import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, 
OffsetCheckpointFile, OffsetCheckpoints}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, 
LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, 
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig, 
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult, 
OffsetResultHolder, RecordValidationException, RemoteLogReadResult, 
RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
 import java.io.File
@@ -188,19 +188,7 @@ object ReplicaManager {
       -1L,
       -1L,
       OptionalLong.empty(),
-      Optional.of(e))
-  }
-
-  def createLogReadResult(e: Throwable): LogReadResult = {
-    new LogReadResult(new 
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
-      Optional.empty(),
-      UnifiedLog.UNKNOWN_OFFSET,
-      UnifiedLog.UNKNOWN_OFFSET,
-      UnifiedLog.UNKNOWN_OFFSET,
-      UnifiedLog.UNKNOWN_OFFSET,
-      -1L,
-      OptionalLong.empty(),
-      Optional.of(e))
+      Errors.forException(e));
   }
 
   private[server] def isListOffsetsTimestampUnsupported(timestamp: JLong, 
version: Short): Boolean = {
@@ -1639,7 +1627,7 @@ class ReplicaManager(val config: KafkaConfig,
   private def processRemoteFetches(remoteFetchInfos: 
util.LinkedHashMap[TopicIdPartition, RemoteStorageFetchInfo],
                                    params: FetchParams,
                                    responseCallback: Seq[(TopicIdPartition, 
FetchPartitionData)] => Unit,
-                                   logReadResults: Seq[(TopicIdPartition, 
LogReadResult)],
+                                   logReadResults: 
util.LinkedHashMap[TopicIdPartition, LogReadResult],
                                    fetchPartitionStatus: 
Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
     val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
     val remoteFetchResults = new util.HashMap[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]]
@@ -1651,8 +1639,15 @@ class ReplicaManager(val config: KafkaConfig,
     }
 
     val remoteFetchMaxWaitMs = 
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
-    val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, 
remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
-      fetchPartitionStatus, params, logReadResults, this, responseCallback)
+    val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks,
+                                             remoteFetchResults,
+                                             remoteFetchInfos,
+                                             remoteFetchMaxWaitMs,
+                                             fetchPartitionStatus.toMap.asJava,
+                                             params,
+                                             logReadResults,
+                                             tp => getPartitionOrException(tp),
+                                             response => 
responseCallback(response.asScala.toSeq))
 
     // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
     val delayedFetchKeys = remoteFetchTasks.asScala.map { case (tp, _) => new 
TopicPartitionOperationKey(tp) }.toList
@@ -1681,7 +1676,7 @@ class ReplicaManager(val config: KafkaConfig,
 
     var hasDivergingEpoch = false
     var hasPreferredReadReplica = false
-    val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
+    val logReadResultMap = new util.LinkedHashMap[TopicIdPartition, 
LogReadResult]
 
     logReadResults.foreach { case (topicIdPartition, logReadResult) =>
       
brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()
@@ -1717,14 +1712,15 @@ class ReplicaManager(val config: KafkaConfig,
       // construct the fetch results from the read results
       val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, 
FetchPartitionStatus)]
       fetchInfos.foreach { case (topicIdPartition, partitionData) =>
-        logReadResultMap.get(topicIdPartition).foreach(logReadResult => {
+        val logReadResult = logReadResultMap.get(topicIdPartition)
+        if (logReadResult != null) {
           val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
-          fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
-        })
+          fetchPartitionStatus += (topicIdPartition -> new 
FetchPartitionStatus(logOffsetMetadata, partitionData))
+        }
       }
 
       if (!remoteFetchInfos.isEmpty) {
-        processRemoteFetches(remoteFetchInfos, params, responseCallback, 
logReadResults, fetchPartitionStatus.toSeq)
+        processRemoteFetches(remoteFetchInfos, params, responseCallback, 
logReadResultMap, fetchPartitionStatus.toSeq)
       } else {
         // If there is not enough data to respond and there is no remote data, 
we will let the fetch request
         // wait for new data.
@@ -1812,7 +1808,7 @@ class ReplicaManager(val config: KafkaConfig,
             -1L,
             OptionalLong.of(offsetSnapshot.lastStableOffset.messageOffset),
             if (preferredReadReplica.isDefined) 
OptionalInt.of(preferredReadReplica.get) else OptionalInt.empty(),
-            Optional.empty())
+            Errors.NONE)
         } else {
           log = 
partition.localLogWithEpochOrThrow(fetchInfo.currentLeaderEpoch, 
params.fetchOnlyLeader())
 
@@ -1836,7 +1832,7 @@ class ReplicaManager(val config: KafkaConfig,
             fetchTimeMs,
             OptionalLong.of(readInfo.lastStableOffset),
             if (preferredReadReplica.isDefined) 
OptionalInt.of(preferredReadReplica.get) else OptionalInt.empty(),
-            Optional.empty()
+            Errors.NONE
           )
         }
       } catch {
@@ -1849,7 +1845,7 @@ class ReplicaManager(val config: KafkaConfig,
                  _: ReplicaNotAvailableException |
                  _: KafkaStorageException |
                  _: InconsistentTopicIdException) =>
-          createLogReadResult(e)
+          new LogReadResult(Errors.forException(e))
         case e: OffsetOutOfRangeException =>
           handleOffsetOutOfRangeError(tp, params, fetchInfo, adjustedMaxBytes, 
minOneMessage, log, fetchTimeMs, e)
         case e: Throwable =>
@@ -1868,7 +1864,7 @@ class ReplicaManager(val config: KafkaConfig,
             UnifiedLog.UNKNOWN_OFFSET,
             -1L,
             OptionalLong.empty(),
-            Optional.of(e)
+            Errors.forException(e)
           )
       }
     }
@@ -1949,10 +1945,10 @@ class ReplicaManager(val config: KafkaConfig,
           fetchInfo.logStartOffset,
           fetchTimeMs,
           OptionalLong.of(log.lastStableOffset),
-          Optional.empty[Throwable]())
+          Errors.NONE)
       }
     } else {
-      createLogReadResult(exception)
+      new LogReadResult(Errors.forException(exception))
     }
   }
 
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index ffa9f8b1145..8aab8eb5495 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.server.LogReadResult;
 import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
 import org.apache.kafka.server.purgatory.DelayedOperationKey;
 import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
@@ -52,6 +51,7 @@ import org.apache.kafka.server.util.timer.TimerTask;
 import org.apache.kafka.storage.internals.log.FetchDataInfo;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
 import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
+import org.apache.kafka.storage.internals.log.LogReadResult;
 import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
 import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
@@ -2023,7 +2023,7 @@ public class DelayedShareFetchTest {
             -1L,
             OptionalLong.empty(),
             OptionalInt.empty(),
-            Optional.empty()
+            Errors.NONE
         ));
         
when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch));
         when(pendingRemoteFetches.isDone()).thenReturn(false);
@@ -2104,7 +2104,7 @@ public class DelayedShareFetchTest {
             -1L,
             OptionalLong.empty(),
             OptionalInt.empty(),
-            Optional.empty()
+            Errors.NONE
         ));
         
when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch));
         when(pendingRemoteFetches.isDone()).thenReturn(false);
@@ -2179,7 +2179,7 @@ public class DelayedShareFetchTest {
             -1L,
             OptionalLong.empty(),
             OptionalInt.empty(),
-            Optional.empty()
+            Errors.NONE
         ))));
         remoteReadTopicIdPartitions.forEach(topicIdPartition -> 
logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(
             REMOTE_FETCH_INFO,
@@ -2191,7 +2191,7 @@ public class DelayedShareFetchTest {
             -1L,
             OptionalLong.empty(),
             OptionalInt.empty(),
-            Optional.empty()
+            Errors.NONE
         ))));
         return CollectionConverters.asScala(logReadResults).toSeq();
     }
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 24a84bab64a..541719aea06 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -51,7 +51,6 @@ import org.apache.kafka.common.requests.ShareRequestMetadata;
 import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
-import org.apache.kafka.server.LogReadResult;
 import org.apache.kafka.server.common.ShareVersion;
 import org.apache.kafka.server.purgatory.DelayedOperationKey;
 import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
@@ -84,6 +83,7 @@ import org.apache.kafka.server.util.timer.Timer;
 import org.apache.kafka.server.util.timer.TimerTask;
 import org.apache.kafka.storage.internals.log.FetchDataInfo;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.LogReadResult;
 import org.apache.kafka.storage.internals.log.OffsetResultHolder;
 import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
@@ -3226,7 +3226,7 @@ public class SharePartitionManagerTest {
             -1L,
             OptionalLong.empty(),
             OptionalInt.empty(),
-            Optional.empty()
+            Errors.NONE
         ))));
         return CollectionConverters.asScala(logReadResults).toSeq();
     }
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala 
b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index f10beb0086f..fa3b8465d65 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -25,9 +25,8 @@ import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.FetchRequest
-import org.apache.kafka.server.LogReadResult
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
-import org.apache.kafka.storage.internals.log.{FetchDataInfo, 
LogOffsetMetadata, LogOffsetSnapshot}
+import org.apache.kafka.storage.internals.log.{FetchDataInfo, 
FetchPartitionStatus, LogOffsetMetadata, LogOffsetSnapshot, LogReadResult}
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
@@ -48,9 +47,9 @@ class DelayedFetchTest {
     val currentLeaderEpoch = Optional.of[Integer](10)
     val replicaId = 1
 
-    val fetchStatus = FetchPartitionStatus(
-      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
-      fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId(), 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchStatus = new FetchPartitionStatus(
+      new LogOffsetMetadata(fetchOffset),
+      new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
     val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
 
     var fetchResultOpt: Option[FetchPartitionData] = None
@@ -94,9 +93,9 @@ class DelayedFetchTest {
     val currentLeaderEpoch = Optional.of[Integer](10)
     val replicaId = 1
 
-    val fetchStatus = FetchPartitionStatus(
-      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
-      fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId(), 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchStatus = new FetchPartitionStatus(
+      new LogOffsetMetadata(fetchOffset),
+      new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
     val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
 
     var fetchResultOpt: Option[FetchPartitionData] = None
@@ -134,9 +133,9 @@ class DelayedFetchTest {
     val lastFetchedEpoch = Optional.of[Integer](9)
     val replicaId = 1
 
-    val fetchStatus = FetchPartitionStatus(
-      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
-      fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch))
+    val fetchStatus = new FetchPartitionStatus(
+      new LogOffsetMetadata(fetchOffset),
+      new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch))
     val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
 
     var fetchResultOpt: Option[FetchPartitionData] = None
@@ -185,9 +184,9 @@ class DelayedFetchTest {
     val currentLeaderEpoch = Optional.of[Integer](10)
     val replicaId = 1
 
-    val fetchStatus = FetchPartitionStatus(
-      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
-      fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchStatus = new FetchPartitionStatus(
+      new LogOffsetMetadata(fetchOffset),
+      new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
     val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
 
     var fetchResultOpt: Option[FetchPartitionData] = None
@@ -265,7 +264,7 @@ class DelayedFetchTest {
       -1L,
       -1L,
       OptionalLong.empty(),
-      if (error != Errors.NONE) Optional.of[Throwable](error.exception) else 
Optional.empty[Throwable]())
+      error)
   }
 
 }
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala 
b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
deleted file mode 100644
index 23b4b32b0d7..00000000000
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
+++ /dev/null
@@ -1,501 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import com.yammer.metrics.core.Meter
-import kafka.cluster.Partition
-import org.apache.kafka.common.errors.NotLeaderOrFollowerException
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.requests.FetchRequest
-import org.apache.kafka.common.{TopicIdPartition, Uuid}
-import org.apache.kafka.server.LogReadResult
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
-import org.apache.kafka.storage.internals.log._
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-import org.mockito.ArgumentMatchers.anyBoolean
-import org.mockito.Mockito.{mock, never, verify, when}
-
-import java.util.{Collections, Optional, OptionalLong}
-import java.util.concurrent.{CompletableFuture, Future}
-import scala.collection._
-import scala.jdk.CollectionConverters._
-
-class DelayedRemoteFetchTest {
-  private val maxBytes = 1024
-  private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
-  private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, 
"topic")
-  private val topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, 
"topic2")
-  private val fetchOffset = 500L
-  private val logStartOffset = 0L
-  private val currentLeaderEpoch = Optional.of[Integer](10)
-  private val remoteFetchMaxWaitMs = 500
-
-  private val fetchStatus = FetchPartitionStatus(
-    startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
-    fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
-  private val fetchParams = buildFetchParams(replicaId = -1, maxWaitMs = 500)
-
-  @Test
-  def testFetch(): Unit = {
-    var actualTopicPartition: Option[TopicIdPartition] = None
-    var fetchResultOpt: Option[FetchPartitionData] = None
-
-    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
-      assertEquals(1, responses.size)
-      actualTopicPartition = Some(responses.head._1)
-      fetchResultOpt = Some(responses.head._2)
-    }
-
-    val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    future.complete(buildRemoteReadResult(Errors.NONE))
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null)
-    val highWatermark = 100
-    val leaderLogStartOffset = 10
-    val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
-
-    val delayedRemoteFetch = new DelayedRemoteFetch(
-      java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
-      java.util.Collections.singletonMap(topicIdPartition, future),
-      java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
-      remoteFetchMaxWaitMs,
-      Seq(topicIdPartition -> fetchStatus),
-      fetchParams,
-      Seq(topicIdPartition -> logReadInfo),
-      replicaManager,
-      callback)
-
-    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
-      .thenReturn(mock(classOf[Partition]))
-
-    assertTrue(delayedRemoteFetch.tryComplete())
-    assertTrue(delayedRemoteFetch.isCompleted)
-    assertTrue(actualTopicPartition.isDefined)
-    assertEquals(topicIdPartition, actualTopicPartition.get)
-    assertTrue(fetchResultOpt.isDefined)
-
-    val fetchResult = fetchResultOpt.get
-    assertEquals(Errors.NONE, fetchResult.error)
-    assertEquals(highWatermark, fetchResult.highWatermark)
-    assertEquals(leaderLogStartOffset, fetchResult.logStartOffset)
-  }
-
-  @Test
-  def testFollowerFetch(): Unit = {
-    var actualTopicPartition: Option[TopicIdPartition] = None
-    var fetchResultOpt: Option[FetchPartitionData] = None
-
-    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
-      assertEquals(1, responses.size)
-      actualTopicPartition = Some(responses.head._1)
-      fetchResultOpt = Some(responses.head._2)
-    }
-
-    val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    future.complete(buildRemoteReadResult(Errors.NONE))
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null)
-    val highWatermark = 100
-    val leaderLogStartOffset = 10
-    val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
-    val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500)
-
-    assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(
-      java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
-      java.util.Collections.singletonMap(topicIdPartition, future),
-      java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
-      remoteFetchMaxWaitMs,
-      Seq(topicIdPartition -> fetchStatus),
-      fetchParams,
-      Seq(topicIdPartition -> logReadInfo),
-      replicaManager,
-      callback))
-  }
-
-  @Test
-  def testNotLeaderOrFollower(): Unit = {
-    var actualTopicPartition: Option[TopicIdPartition] = None
-    var fetchResultOpt: Option[FetchPartitionData] = None
-
-    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
-      assertEquals(1, responses.size)
-      actualTopicPartition = Some(responses.head._1)
-      fetchResultOpt = Some(responses.head._2)
-    }
-
-    // throw exception while getPartition
-    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
-      .thenThrow(new NotLeaderOrFollowerException(s"Replica for 
$topicIdPartition not available"))
-
-    val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null)
-
-    val logReadInfo = buildReadResult(Errors.NONE)
-
-    val delayedRemoteFetch =  new DelayedRemoteFetch(
-      java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
-      java.util.Collections.singletonMap(topicIdPartition, future),
-      java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
-      remoteFetchMaxWaitMs,
-      Seq(topicIdPartition -> fetchStatus),
-      fetchParams,
-      Seq(topicIdPartition -> logReadInfo),
-      replicaManager,
-      callback)
-
-    // delayed remote fetch should still be able to complete
-    assertTrue(delayedRemoteFetch.tryComplete())
-    assertTrue(delayedRemoteFetch.isCompleted)
-    assertEquals(topicIdPartition, actualTopicPartition.get)
-    assertTrue(fetchResultOpt.isDefined)
-  }
-
-  @Test
-  def testErrorLogReadInfo(): Unit = {
-    var actualTopicPartition: Option[TopicIdPartition] = None
-    var fetchResultOpt: Option[FetchPartitionData] = None
-
-    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
-      assertEquals(1, responses.size)
-      actualTopicPartition = Some(responses.head._1)
-      fetchResultOpt = Some(responses.head._2)
-    }
-
-    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
-      .thenReturn(mock(classOf[Partition]))
-
-    val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    future.complete(buildRemoteReadResult(Errors.NONE))
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null)
-
-    // build a read result with error
-    val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
-
-    val delayedRemoteFetch = new DelayedRemoteFetch(
-      java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
-      java.util.Collections.singletonMap(topicIdPartition, future),
-      java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
-      remoteFetchMaxWaitMs,
-      Seq(topicIdPartition -> fetchStatus),
-      fetchParams,
-      Seq(topicIdPartition -> logReadInfo),
-      replicaManager,
-      callback)
-
-    assertTrue(delayedRemoteFetch.tryComplete())
-    assertTrue(delayedRemoteFetch.isCompleted)
-    assertEquals(topicIdPartition, actualTopicPartition.get)
-    assertTrue(fetchResultOpt.isDefined)
-    assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResultOpt.get.error)
-  }
-
-  @Test
-  def testRequestExpiry(): Unit = {
-    val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
-
-    def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
-      responseSeq.foreach { case (tp, data) =>
-        responses.put(tp, data)
-      }
-    }
-
-    def expiresPerSecValue(): Double = {
-      val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
-      val metric = allMetrics.find { case (n, _) => 
n.getMBeanName.endsWith("kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec")
 }
-
-      if (metric.isEmpty)
-        0
-      else
-        metric.get._2.asInstanceOf[Meter].count
-    }
-
-    val remoteFetchTaskExpired = mock(classOf[Future[Void]])
-    val remoteFetchTask2 = mock(classOf[Future[Void]])
-    // complete the 2nd task, and keep the 1st one expired
-    when(remoteFetchTask2.isDone).thenReturn(true)
-
-    // Create futures - one completed, one not
-    val future1: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    val future2: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    // Only complete one remote fetch
-    future2.complete(buildRemoteReadResult(Errors.NONE))
-
-    val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, 
null, null)
-    val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, 
null, null)
-
-    val highWatermark = 100
-    val leaderLogStartOffset = 10
-
-    val logReadInfo1 = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
-    val logReadInfo2 = buildReadResult(Errors.NONE)
-
-    val fetchStatus1 = FetchPartitionStatus(
-      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
-      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
-    val fetchStatus2 = FetchPartitionStatus(
-      startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
-      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 
100, logStartOffset, maxBytes, currentLeaderEpoch))
-
-    // Set up maps for multiple partitions
-    val remoteFetchTasks = new java.util.HashMap[TopicIdPartition, 
Future[Void]]()
-    val remoteFetchResults = new java.util.HashMap[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]]()
-    val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, 
RemoteStorageFetchInfo]()
-
-    remoteFetchTasks.put(topicIdPartition, remoteFetchTaskExpired)
-    remoteFetchTasks.put(topicIdPartition2, remoteFetchTask2)
-    remoteFetchResults.put(topicIdPartition, future1)
-    remoteFetchResults.put(topicIdPartition2, future2)
-    remoteFetchInfos.put(topicIdPartition, fetchInfo1)
-    remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
-
-    val delayedRemoteFetch = new DelayedRemoteFetch(
-      remoteFetchTasks,
-      remoteFetchResults,
-      remoteFetchInfos,
-      remoteFetchMaxWaitMs,
-      Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
-      fetchParams,
-      Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
-      replicaManager,
-      callback)
-
-    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
-      .thenReturn(mock(classOf[Partition]))
-    
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
-      .thenReturn(mock(classOf[Partition]))
-
-    // Verify that the ExpiresPerSec metric is zero before fetching
-    val existingMetricVal = expiresPerSecValue()
-    // Verify the delayedRemoteFetch is not completed yet
-    assertFalse(delayedRemoteFetch.isCompleted)
-
-    // Force the delayed remote fetch to expire
-    delayedRemoteFetch.run()
-
-    // Check that the expired task was cancelled and force-completed
-    verify(remoteFetchTaskExpired).cancel(anyBoolean())
-    verify(remoteFetchTask2, never()).cancel(anyBoolean())
-    assertTrue(delayedRemoteFetch.isCompleted)
-
-    // Check that the ExpiresPerSec metric was incremented
-    assertTrue(expiresPerSecValue() > existingMetricVal)
-
-    // Fetch results should include 2 results and the expired one should 
return local read results
-    assertEquals(2, responses.size)
-    assertTrue(responses.contains(topicIdPartition))
-    assertTrue(responses.contains(topicIdPartition2))
-
-    assertEquals(Errors.NONE, responses(topicIdPartition).error)
-    assertEquals(highWatermark, responses(topicIdPartition).highWatermark)
-    assertEquals(leaderLogStartOffset, 
responses(topicIdPartition).logStartOffset)
-
-    assertEquals(Errors.NONE, responses(topicIdPartition2).error)
-  }
-
-  @Test
-  def testMultiplePartitions(): Unit = {
-    val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
-
-    def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
-      responseSeq.foreach { case (tp, data) =>
-        responses.put(tp, data)
-      }
-    }
-
-    // Create futures - one completed, one not
-    val future1: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    val future2: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    // Only complete one remote fetch
-    future1.complete(buildRemoteReadResult(Errors.NONE))
-
-    val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, 
null, null)
-    val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, 
null, null)
-
-    val highWatermark1 = 100
-    val leaderLogStartOffset1 = 10
-    val highWatermark2 = 200
-    val leaderLogStartOffset2 = 20
-
-    val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10)
-    val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20)
-
-    val fetchStatus1 = FetchPartitionStatus(
-      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
-      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
-    val fetchStatus2 = FetchPartitionStatus(
-      startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
-      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 
100, logStartOffset, maxBytes, currentLeaderEpoch))
-
-    // Set up maps for multiple partitions
-    val remoteFetchResults = new java.util.HashMap[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]]()
-    val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, 
RemoteStorageFetchInfo]()
-
-    remoteFetchResults.put(topicIdPartition, future1)
-    remoteFetchResults.put(topicIdPartition2, future2)
-    remoteFetchInfos.put(topicIdPartition, fetchInfo1)
-    remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
-
-    val delayedRemoteFetch = new DelayedRemoteFetch(
-      Collections.emptyMap[TopicIdPartition, Future[Void]](),
-      remoteFetchResults,
-      remoteFetchInfos,
-      remoteFetchMaxWaitMs,
-      Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
-      fetchParams,
-      Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
-      replicaManager,
-      callback)
-
-    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
-      .thenReturn(mock(classOf[Partition]))
-    
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
-      .thenReturn(mock(classOf[Partition]))
-
-    // Should not complete since future2 is not done
-    assertFalse(delayedRemoteFetch.tryComplete())
-    assertFalse(delayedRemoteFetch.isCompleted)
-
-    // Complete future2
-    future2.complete(buildRemoteReadResult(Errors.NONE))
-
-    // Now it should complete
-    assertTrue(delayedRemoteFetch.tryComplete())
-    assertTrue(delayedRemoteFetch.isCompleted)
-
-    // Verify both partitions were processed without error
-    assertEquals(2, responses.size)
-    assertTrue(responses.contains(topicIdPartition))
-    assertTrue(responses.contains(topicIdPartition2))
-
-    assertEquals(Errors.NONE, responses(topicIdPartition).error)
-    assertEquals(highWatermark1, responses(topicIdPartition).highWatermark)
-    assertEquals(leaderLogStartOffset1, 
responses(topicIdPartition).logStartOffset)
-
-    assertEquals(Errors.NONE, responses(topicIdPartition2).error)
-    assertEquals(highWatermark2, responses(topicIdPartition2).highWatermark)
-    assertEquals(leaderLogStartOffset2, 
responses(topicIdPartition2).logStartOffset)
-  }
-
-  @Test
-  def testMultiplePartitionsWithFailedResults(): Unit = {
-    val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
-
-    def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
-      responseSeq.foreach { case (tp, data) =>
-        responses.put(tp, data)
-      }
-    }
-
-    // Create futures - one successful, one with error
-    val future1: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    val future2: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-
-    // Created 1 successful result and 1 failed result
-    future1.complete(buildRemoteReadResult(Errors.NONE))
-    future2.complete(buildRemoteReadResult(Errors.UNKNOWN_SERVER_ERROR))
-
-    val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, 
null, null)
-    val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, 
null, null)
-
-    val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10)
-    val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20)
-
-    val fetchStatus1 = FetchPartitionStatus(
-      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
-      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
-    val fetchStatus2 = FetchPartitionStatus(
-      startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
-      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 
100, logStartOffset, maxBytes, currentLeaderEpoch))
-
-    // Set up maps for multiple partitions
-    val remoteFetchResults = new java.util.HashMap[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]]()
-    val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, 
RemoteStorageFetchInfo]()
-
-    remoteFetchResults.put(topicIdPartition, future1)
-    remoteFetchResults.put(topicIdPartition2, future2)
-    remoteFetchInfos.put(topicIdPartition, fetchInfo1)
-    remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
-
-    val delayedRemoteFetch = new DelayedRemoteFetch(
-      Collections.emptyMap[TopicIdPartition, Future[Void]](),
-      remoteFetchResults,
-      remoteFetchInfos,
-      remoteFetchMaxWaitMs,
-      Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
-      fetchParams,
-      Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
-      replicaManager,
-      callback)
-
-    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
-      .thenReturn(mock(classOf[Partition]))
-    
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
-      .thenReturn(mock(classOf[Partition]))
-
-    assertTrue(delayedRemoteFetch.tryComplete())
-    assertTrue(delayedRemoteFetch.isCompleted)
-
-    // Verify both partitions were processed
-    assertEquals(2, responses.size)
-    assertTrue(responses.contains(topicIdPartition))
-    assertTrue(responses.contains(topicIdPartition2))
-
-    // First partition should be successful
-    val fetchResult1 = responses(topicIdPartition)
-    assertEquals(Errors.NONE, fetchResult1.error)
-
-    // Second partition should have an error due to remote fetch failure
-    val fetchResult2 = responses(topicIdPartition2)
-    assertEquals(Errors.UNKNOWN_SERVER_ERROR, fetchResult2.error)
-  }
-
-  private def buildFetchParams(replicaId: Int,
-                               maxWaitMs: Int): FetchParams = {
-    new FetchParams(
-      replicaId,
-      1,
-      maxWaitMs,
-      1,
-      maxBytes,
-      FetchIsolation.LOG_END,
-      Optional.empty()
-    )
-  }
-
-  private def buildReadResult(error: Errors,
-                              highWatermark: Int = 0,
-                              leaderLogStartOffset: Int = 0): LogReadResult = {
-    new LogReadResult(
-      new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
MemoryRecords.EMPTY, false, Optional.empty(),
-        Optional.of(mock(classOf[RemoteStorageFetchInfo]))),
-      Optional.empty(),
-      highWatermark,
-      leaderLogStartOffset,
-      -1L,
-      -1L,
-      -1L,
-      OptionalLong.empty(),
-      if (error != Errors.NONE) Optional.of[Throwable](error.exception) else 
Optional.empty[Throwable]())
-  }
-
-  private def buildRemoteReadResult(error: Errors): RemoteLogReadResult = {
-    new RemoteLogReadResult(
-      Optional.of(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
MemoryRecords.EMPTY)),
-      if (error != Errors.NONE) Optional.of[Throwable](error.exception) else 
Optional.empty[Throwable]())
-  }
-}
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 8f10811091d..366645344ae 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -33,7 +33,7 @@ import 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.common.RequestLocal
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteListOffsets, TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets, 
TopicPartitionOperationKey}
 import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
 import org.apache.kafka.server.util.timer.{MockTimer, Timer}
 import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index a7948ae901f..307afad4f5f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.KRaftVersion
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
-import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig, 
LogDirFailureChannel, LogOffsetMetadata, LogOffsetSnapshot, UnifiedLog}
+import org.apache.kafka.storage.internals.log.{FetchDataInfo, 
FetchPartitionStatus, LogConfig, LogDirFailureChannel, LogOffsetMetadata, 
LogOffsetSnapshot, UnifiedLog}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
 import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong}
@@ -171,7 +171,7 @@ class ReplicaManagerQuotasTest {
       when(partition.getReplica(1)).thenReturn(None)
 
       val tp = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("t1", 0))
-      val fetchPartitionStatus = FetchPartitionStatus(
+      val fetchPartitionStatus = new FetchPartitionStatus(
         new LogOffsetMetadata(50L, 0L, 250),
         new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
       val fetchParams = new FetchParams(
@@ -222,7 +222,7 @@ class ReplicaManagerQuotasTest {
         .thenReturn(partition)
 
       val tidp = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("t1", 0))
-      val fetchPartitionStatus = FetchPartitionStatus(
+      val fetchPartitionStatus = new FetchPartitionStatus(
         new LogOffsetMetadata(50L, 0L, 250),
         new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
       val fetchParams = new FetchParams(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index fa43b8bc1a4..d0ebe4b2025 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -63,8 +63,8 @@ import org.apache.kafka.server.log.remote.TopicPartitionLog
 import org.apache.kafka.server.log.remote.storage._
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.{LogReadResult, PartitionFetchState}
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteListOffsets}
+import org.apache.kafka.server.PartitionFetchState
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets}
 import org.apache.kafka.server.share.SharePartitionKey
 import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, 
DelayedShareFetchKey, ShareFetch}
 import org.apache.kafka.server.share.metrics.ShareGroupMetrics
@@ -76,7 +76,7 @@ import org.apache.kafka.server.util.timer.{MockTimer, 
SystemTimer}
 import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
 import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, 
FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, 
LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, 
LogSegments, ProducerStateManager, ProducerStateManagerConfig, 
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, 
FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, 
LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, 
LogReadResult, LogSegments, ProducerStateManager, ProducerStateManagerConfig, 
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
@@ -3575,13 +3575,13 @@ class ReplicaManagerTest {
         mock(classOf[FetchDataInfo])
       }).when(spyRLM).read(any())
 
-      val curExpiresPerSec = 
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
+      val curExpiresPerSec = DelayedRemoteFetch.expiredRequestCount()
       replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 100000, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UNBOUNDED_QUOTA, fetchCallback)
       // advancing the clock to expire the delayed remote fetch
       timer.advanceClock(2000L)
 
       // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is 
called since the delayed remote fetch is expired
-      TestUtils.waitUntilTrue(() => (curExpiresPerSec + 1) == 
DelayedRemoteFetchMetrics.expiredRequestMeter.count(), 
"DelayedRemoteFetchMetrics.expiredRequestMeter.count() should be 1, but got: " 
+ DelayedRemoteFetchMetrics.expiredRequestMeter.count(), 10000L)
+      TestUtils.waitUntilTrue(() => (curExpiresPerSec + 1) == 
DelayedRemoteFetch.expiredRequestCount(), 
"DelayedRemoteFetchMetrics.expiredRequestMeter.count() should be 1, but got: " 
+ DelayedRemoteFetch.expiredRequestCount(), 10000L)
       latch.countDown()
     } finally {
       Utils.tryAll(util.Arrays.asList[Callable[Void]](
diff --git 
a/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteFetch.java
 
b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteFetch.java
new file mode 100644
index 00000000000..8c872811eb6
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteFetch.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.storage.log.FetchParams;
+import org.apache.kafka.server.storage.log.FetchPartitionData;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.FetchPartitionStatus;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.LogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+
+import com.yammer.metrics.core.Meter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * A remote fetch operation that can be created by the replica manager and 
watched
+ * in the remote fetch operation purgatory
+ */
+public class DelayedRemoteFetch extends DelayedOperation {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DelayedRemoteFetch.class);
+
+    // For compatibility, metrics are defined to be under 
`kafka.server.DelayedRemoteFetchMetrics` class
+    private static final KafkaMetricsGroup METRICS_GROUP = new 
KafkaMetricsGroup("kafka.server", "DelayedRemoteFetchMetrics");
+
+    private static final Meter EXPIRED_REQUEST_METER = 
METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS);
+
+    private final Map<TopicIdPartition, Future<Void>> remoteFetchTasks;
+    private final Map<TopicIdPartition, 
CompletableFuture<RemoteLogReadResult>> remoteFetchResults;
+    private final Map<TopicIdPartition, RemoteStorageFetchInfo> 
remoteFetchInfos;
+    private final Map<TopicIdPartition, FetchPartitionStatus> 
fetchPartitionStatus;
+    private final FetchParams fetchParams;
+    private final Map<TopicIdPartition, LogReadResult> localReadResults;
+    private final Consumer<TopicPartition> partitionOrException;
+    private final Consumer<Map<TopicIdPartition, FetchPartitionData>> 
responseCallback;
+
+    public DelayedRemoteFetch(Map<TopicIdPartition, Future<Void>> 
remoteFetchTasks,
+                              Map<TopicIdPartition, 
CompletableFuture<RemoteLogReadResult>> remoteFetchResults,
+                              Map<TopicIdPartition, RemoteStorageFetchInfo> 
remoteFetchInfos,
+                              long remoteFetchMaxWaitMs,
+                              Map<TopicIdPartition, FetchPartitionStatus> 
fetchPartitionStatus,
+                              FetchParams fetchParams,
+                              Map<TopicIdPartition, LogReadResult> 
localReadResults,
+                              Consumer<TopicPartition> partitionOrException,
+                              Consumer<Map<TopicIdPartition, 
FetchPartitionData>> responseCallback) {
+        super(remoteFetchMaxWaitMs);
+        this.remoteFetchTasks = remoteFetchTasks;
+        this.remoteFetchResults = remoteFetchResults;
+        this.remoteFetchInfos = remoteFetchInfos;
+        this.fetchPartitionStatus = fetchPartitionStatus;
+        this.fetchParams = fetchParams;
+        this.localReadResults = localReadResults;
+        this.partitionOrException = partitionOrException;
+        this.responseCallback = responseCallback;
+
+        if (fetchParams.isFromFollower()) {
+            throw new IllegalStateException("The follower should not invoke 
remote fetch. Fetch params are: " + fetchParams);
+        }
+    }
+
+    /**
+     * The operation can be completed if:
+     * <p>
+     * Case a: This broker is no longer the leader of the partition it tries 
to fetch
+     * <p>
+     * Case b: This broker does not know the partition it tries to fetch
+     * <p>
+     * Case c: All the remote storage read requests completed (succeeded or 
failed)
+     * <p>
+     * Case d: The partition is in an offline log directory on this broker
+     *
+     * Upon completion, should return whatever data is available for each 
valid partition
+     */
+    @Override
+    public boolean tryComplete() {
+        for (Map.Entry<TopicIdPartition, FetchPartitionStatus> entry : 
fetchPartitionStatus.entrySet()) {
+            TopicIdPartition topicPartition = entry.getKey();
+            FetchPartitionStatus fetchStatus = entry.getValue();
+            LogOffsetMetadata fetchOffset = fetchStatus.startOffsetMetadata();
+            try {
+                if 
(!fetchOffset.equals(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)) {
+                    
partitionOrException.accept(topicPartition.topicPartition());
+                }
+            } catch (KafkaStorageException e) { // Case d
+                LOG.debug("Partition {} is in an offline log directory, 
satisfy {} immediately.", topicPartition, fetchParams);
+                return forceComplete();
+            } catch (UnknownTopicOrPartitionException e) { // Case b
+                LOG.debug("Broker no longer knows of partition {}, satisfy {} 
immediately", topicPartition, fetchParams);
+                return forceComplete();
+            } catch (NotLeaderOrFollowerException e) { // Case a
+                LOG.debug("Broker is no longer the leader or follower of {}, 
satisfy {} immediately", topicPartition, fetchParams);
+                return forceComplete();
+            }
+        }
+
+        // Case c
+        if 
(remoteFetchResults.values().stream().allMatch(CompletableFuture::isDone)) {
+            return forceComplete();
+        }
+        return false;
+    }
+
+    @Override
+    public void onExpiration() {
+        // cancel the remote storage read task, if it has not been executed 
yet and
+        // avoid interrupting the task if it is already running as it may 
force closing opened/cached resources as transaction index.
+        remoteFetchTasks.forEach((topicIdPartition, task) -> {
+            if (task != null && !task.isDone() && !task.cancel(false)) {
+                LOG.debug("Remote fetch task for remoteFetchInfo: {} could not 
be cancelled.", remoteFetchInfos.get(topicIdPartition));
+            }
+        });
+
+        EXPIRED_REQUEST_METER.mark();
+    }
+
+    /**
+     * Upon completion, read whatever data is available and pass to the 
complete callback
+     */
+    @Override
+    public void onComplete() {
+        Map<TopicIdPartition, FetchPartitionData> fetchPartitionData = new 
LinkedHashMap<>();
+        localReadResults.forEach((tpId, result) -> {
+            CompletableFuture<RemoteLogReadResult> remoteFetchResult = 
remoteFetchResults.get(tpId);
+            if (remoteFetchResults.containsKey(tpId)
+                && remoteFetchResult.isDone()
+                && result.error() == Errors.NONE
+                && result.info().delayedRemoteStorageFetch.isPresent()) {
+
+                if (remoteFetchResult.join().error().isPresent()) {
+                    fetchPartitionData.put(tpId,
+                        new 
LogReadResult(Errors.forException(remoteFetchResult.join().error().get())).toFetchPartitionData(false));
+                } else {
+                    FetchDataInfo info = 
remoteFetchResult.join().fetchDataInfo().get();
+                    fetchPartitionData.put(tpId,
+                        new FetchPartitionData(
+                            result.error(),
+                            result.highWatermark(),
+                            result.leaderLogStartOffset(),
+                            info.records,
+                            Optional.empty(),
+                            result.lastStableOffset(),
+                            info.abortedTransactions,
+                            result.preferredReadReplica(),
+                            false));
+                }
+            } else {
+                fetchPartitionData.put(tpId, 
result.toFetchPartitionData(false));
+            }
+        });
+
+        responseCallback.accept(fetchPartitionData);
+    }
+
+    // Visible for testing
+    public static long expiredRequestCount() {
+        return EXPIRED_REQUEST_METER.count();
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchPartitionStatus.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchPartitionStatus.java
new file mode 100644
index 00000000000..5a060013b5e
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchPartitionStatus.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+
+/**
+ * A class containing log offset metadata and fetch info for a topic partition.
+ */
+public record FetchPartitionStatus(
+    LogOffsetMetadata startOffsetMetadata,
+    PartitionData fetchInfo
+) {
+}
diff --git a/server/src/main/java/org/apache/kafka/server/LogReadResult.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadResult.java
similarity index 59%
rename from server/src/main/java/org/apache/kafka/server/LogReadResult.java
rename to 
storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadResult.java
index 203654391af..057a66a25c1 100644
--- a/server/src/main/java/org/apache/kafka/server/LogReadResult.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadResult.java
@@ -14,12 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.server;
+package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.server.storage.log.FetchPartitionData;
-import org.apache.kafka.storage.internals.log.FetchDataInfo;
 
 import java.util.Optional;
 import java.util.OptionalInt;
@@ -38,7 +38,7 @@ import java.util.OptionalLong;
  * @param fetchTimeMs The time the fetch was received
  * @param lastStableOffset Current LSO or None if the result has an exception
  * @param preferredReadReplica the preferred read replica to be used for 
future fetches
- * @param exception Exception if error encountered while reading from the log
+ * @param error Errors if error encountered while reading from the log
  */
 public record LogReadResult(
     FetchDataInfo info,
@@ -50,21 +50,8 @@ public record LogReadResult(
     long fetchTimeMs,
     OptionalLong lastStableOffset,
     OptionalInt preferredReadReplica,
-    Optional<Throwable> exception
+    Errors error
 ) {
-    public LogReadResult(
-            FetchDataInfo info,
-            Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
-            long highWatermark,
-            long leaderLogStartOffset,
-            long leaderLogEndOffset,
-            long followerLogStartOffset,
-            long fetchTimeMs,
-            OptionalLong lastStableOffset) {
-        this(info, divergingEpoch, highWatermark, leaderLogStartOffset, 
leaderLogEndOffset, followerLogStartOffset,
-            fetchTimeMs, lastStableOffset, OptionalInt.empty(), 
Optional.empty());
-    }
-
     public LogReadResult(
         FetchDataInfo info,
         Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
@@ -74,44 +61,21 @@ public record LogReadResult(
         long followerLogStartOffset,
         long fetchTimeMs,
         OptionalLong lastStableOffset,
-        Optional<Throwable> exception) {
-        this(info, divergingEpoch, highWatermark, leaderLogStartOffset, 
leaderLogEndOffset, followerLogStartOffset,
-            fetchTimeMs, lastStableOffset, OptionalInt.empty(), exception);
-    }
-
-    public LogReadResult(
-            FetchDataInfo info,
-            Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
-            long highWatermark,
-            long leaderLogStartOffset,
-            long leaderLogEndOffset,
-            long followerLogStartOffset,
-            long fetchTimeMs,
-            OptionalLong lastStableOffset,
-            OptionalInt preferredReadReplica) {
+        Errors error) {
         this(info, divergingEpoch, highWatermark, leaderLogStartOffset, 
leaderLogEndOffset, followerLogStartOffset,
-            fetchTimeMs, lastStableOffset, preferredReadReplica, 
Optional.empty());
-    }
-
-    public Errors error() {
-        if (exception.isPresent()) {
-            return Errors.forException(exception.get());
-        }
-        return Errors.NONE;
+            fetchTimeMs, lastStableOffset, OptionalInt.empty(), error);
     }
 
-    @Override
-    public String toString() {
-        return "LogReadResult(info=" + info +
-               ", divergingEpoch=" + divergingEpoch +
-               ", highWatermark=" + highWatermark +
-               ", leaderLogStartOffset" + leaderLogStartOffset +
-               ", leaderLogEndOffset" + leaderLogEndOffset +
-               ", followerLogStartOffset" + followerLogStartOffset +
-               ", fetchTimeMs=" + fetchTimeMs +
-               ", preferredReadReplica=" + preferredReadReplica +
-               ", lastStableOffset=" + lastStableOffset +
-               ", error=" + error() + ")";
+    public LogReadResult(Errors error) {
+        this(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
MemoryRecords.EMPTY),
+            Optional.empty(),
+            UnifiedLog.UNKNOWN_OFFSET,
+            UnifiedLog.UNKNOWN_OFFSET,
+            UnifiedLog.UNKNOWN_OFFSET,
+            UnifiedLog.UNKNOWN_OFFSET,
+            -1L,
+            OptionalLong.empty(),
+            error);
     }
 
     public FetchPartitionData toFetchPartitionData(boolean 
isReassignmentFetch) {
diff --git 
a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteFetchTest.java
 
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteFetchTest.java
new file mode 100644
index 00000000000..d4fc0c3ef27
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteFetchTest.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.storage.log.FetchParams;
+import org.apache.kafka.server.storage.log.FetchPartitionData;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.FetchPartitionStatus;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.LogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricName;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class DelayedRemoteFetchTest {
+    private final int maxBytes = 1024;
+    private final Consumer<TopicPartition> partitionOrException = 
mock(Consumer.class);
+    private final TopicIdPartition topicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), 0, "topic");
+    private final TopicIdPartition topicIdPartition2 = new 
TopicIdPartition(Uuid.randomUuid(), 0, "topic2");
+    private final long fetchOffset = 500L;
+    private final long logStartOffset = 0L;
+    private final Optional<Integer> currentLeaderEpoch = Optional.of(10);
+    private final int remoteFetchMaxWaitMs = 500;
+
+    private final FetchPartitionStatus fetchStatus = new FetchPartitionStatus(
+        new LogOffsetMetadata(fetchOffset),
+        new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch)
+    );
+    private final FetchParams fetchParams = buildFetchParams(-1, 500);
+
+    @Test
+    public void testFetch() {
+        AtomicReference<TopicIdPartition> actualTopicPartition = new 
AtomicReference<>();
+        AtomicReference<FetchPartitionData> fetchResultOpt = new 
AtomicReference<>();
+
+        Consumer<Map<TopicIdPartition, FetchPartitionData>> callback = 
responses -> {
+            assertEquals(1, responses.size());
+            Map.Entry<TopicIdPartition, FetchPartitionData> entry = 
responses.entrySet().iterator().next();
+            actualTopicPartition.set(entry.getKey());
+            fetchResultOpt.set(entry.getValue());
+        };
+
+        CompletableFuture<RemoteLogReadResult> future = new 
CompletableFuture<>();
+        future.complete(buildRemoteReadResult(Errors.NONE));
+
+        RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null);
+        long highWatermark = 100L;
+        long leaderLogStartOffset = 10L;
+        LogReadResult logReadInfo = buildReadResult(Errors.NONE, 
highWatermark, leaderLogStartOffset);
+
+        DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch(
+            Map.of(),
+            Map.of(topicIdPartition, future),
+            Map.of(topicIdPartition, fetchInfo),
+            remoteFetchMaxWaitMs,
+            Map.of(topicIdPartition, fetchStatus),
+            fetchParams,
+            Map.of(topicIdPartition, logReadInfo),
+            partitionOrException,
+            callback
+        );
+
+        assertTrue(delayedRemoteFetch.tryComplete());
+        assertTrue(delayedRemoteFetch.isCompleted());
+        assertNotNull(actualTopicPartition.get());
+        assertEquals(topicIdPartition, actualTopicPartition.get());
+        assertNotNull(fetchResultOpt.get());
+
+        FetchPartitionData fetchResult = fetchResultOpt.get();
+        assertEquals(Errors.NONE, fetchResult.error);
+        assertEquals(highWatermark, fetchResult.highWatermark);
+        assertEquals(leaderLogStartOffset, fetchResult.logStartOffset);
+    }
+
+    @Test
+    public void testFollowerFetch() {
+        Consumer<Map<TopicIdPartition, FetchPartitionData>> callback = 
responses -> {
+            assertEquals(1, responses.size());
+        };
+
+        CompletableFuture<RemoteLogReadResult> future = new 
CompletableFuture<>();
+        future.complete(buildRemoteReadResult(Errors.NONE));
+        RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null);
+        LogReadResult logReadInfo = buildReadResult(Errors.NONE, 100L, 10L);
+
+        assertThrows(IllegalStateException.class, () ->
+            new DelayedRemoteFetch(
+                Map.of(),
+                Map.of(topicIdPartition, future),
+                Map.of(topicIdPartition, fetchInfo),
+                remoteFetchMaxWaitMs,
+                Map.of(topicIdPartition, fetchStatus),
+                buildFetchParams(1, 500),
+                Map.of(topicIdPartition, logReadInfo),
+                partitionOrException,
+                callback
+            ));
+    }
+
+    @Test
+    public void testNotLeaderOrFollower() {
+        AtomicReference<TopicIdPartition> actualTopicPartition = new 
AtomicReference<>();
+        AtomicReference<FetchPartitionData> fetchResultOpt = new 
AtomicReference<>();
+
+        Consumer<Map<TopicIdPartition, FetchPartitionData>> callback = 
responses -> {
+            assertEquals(1, responses.size());
+            Map.Entry<TopicIdPartition, FetchPartitionData> entry = 
responses.entrySet().iterator().next();
+            actualTopicPartition.set(entry.getKey());
+            fetchResultOpt.set(entry.getValue());
+        };
+
+        // throw exception while getPartition
+        doThrow(new NotLeaderOrFollowerException(String.format("Replica for %s 
not available", topicIdPartition)))
+            
.when(partitionOrException).accept(topicIdPartition.topicPartition());
+
+        CompletableFuture<RemoteLogReadResult> future = new 
CompletableFuture<>();
+        RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null);
+
+        LogReadResult logReadInfo = buildReadResult(Errors.NONE);
+
+        DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch(
+            Map.of(),
+            Map.of(topicIdPartition, future),
+            Map.of(topicIdPartition, fetchInfo),
+            remoteFetchMaxWaitMs,
+            Map.of(topicIdPartition, fetchStatus),
+            fetchParams,
+            Map.of(topicIdPartition, logReadInfo),
+            partitionOrException,
+            callback
+        );
+
+        // delayed remote fetch should still be able to complete
+        assertTrue(delayedRemoteFetch.tryComplete());
+        assertTrue(delayedRemoteFetch.isCompleted());
+        assertEquals(topicIdPartition, actualTopicPartition.get());
+        assertNotNull(fetchResultOpt.get());
+    }
+
+    @Test
+    public void testErrorLogReadInfo() {
+        AtomicReference<TopicIdPartition> actualTopicPartition = new 
AtomicReference<>();
+        AtomicReference<FetchPartitionData> fetchResultOpt = new 
AtomicReference<>();
+
+        Consumer<Map<TopicIdPartition, FetchPartitionData>> callback = 
responses -> {
+            assertEquals(1, responses.size());
+            Map.Entry<TopicIdPartition, FetchPartitionData> entry = 
responses.entrySet().iterator().next();
+            actualTopicPartition.set(entry.getKey());
+            fetchResultOpt.set(entry.getValue());
+        };
+
+        CompletableFuture<RemoteLogReadResult> future = new 
CompletableFuture<>();
+        future.complete(buildRemoteReadResult(Errors.NONE));
+
+        RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null);
+
+        // build a read result with error
+        LogReadResult logReadInfo = 
buildReadResult(Errors.FENCED_LEADER_EPOCH);
+
+        DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch(
+            Map.of(),
+            Map.of(topicIdPartition, future),
+            Map.of(topicIdPartition, fetchInfo),
+            remoteFetchMaxWaitMs,
+            Map.of(topicIdPartition, fetchStatus),
+            fetchParams,
+            Map.of(topicIdPartition, logReadInfo),
+            partitionOrException,
+            callback
+        );
+
+        assertTrue(delayedRemoteFetch.tryComplete());
+        assertTrue(delayedRemoteFetch.isCompleted());
+        assertEquals(topicIdPartition, actualTopicPartition.get());
+        assertNotNull(fetchResultOpt.get());
+        assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResultOpt.get().error);
+    }
+
+    private long expiresPerSecValue() {
+        Map<MetricName, Metric> allMetrics = 
KafkaYammerMetrics.defaultRegistry().allMetrics();
+        return allMetrics.entrySet()
+            .stream()
+            .filter(e -> 
e.getKey().getMBeanName().endsWith("kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec"))
+            .findFirst()
+            .map(Map.Entry::getValue)
+            .filter(Meter.class::isInstance)
+            .map(Meter.class::cast)
+            .map(Meter::count)
+            .orElse(0L);
+    }
+
+    @Test
+    public void testRequestExpiry() {
+        Map<TopicIdPartition, FetchPartitionData> responses = new HashMap<>();
+
+        Consumer<Map<TopicIdPartition, FetchPartitionData>> callback = 
responses::putAll;
+
+        Future<Void> remoteFetchTaskExpired = mock(Future.class);
+        Future<Void> remoteFetchTask2 = mock(Future.class);
+        // complete the 2nd task, and keep the 1st one expired
+        when(remoteFetchTask2.isDone()).thenReturn(true);
+
+        // Create futures - one completed, one not
+        CompletableFuture<RemoteLogReadResult> future1 = new 
CompletableFuture<>();
+        CompletableFuture<RemoteLogReadResult> future2 = new 
CompletableFuture<>();
+        // Only complete one remote fetch
+        future2.complete(buildRemoteReadResult(Errors.NONE));
+
+        RemoteStorageFetchInfo fetchInfo1 = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null);
+        RemoteStorageFetchInfo fetchInfo2 = new RemoteStorageFetchInfo(0, 
false, topicIdPartition2, null, null);
+
+        long highWatermark = 100L;
+        long leaderLogStartOffset = 10L;
+
+        LogReadResult logReadInfo1 = buildReadResult(Errors.NONE, 
highWatermark, leaderLogStartOffset);
+        LogReadResult logReadInfo2 = buildReadResult(Errors.NONE);
+
+        FetchPartitionStatus fetchStatus1 = new FetchPartitionStatus(
+            new LogOffsetMetadata(fetchOffset),
+            new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch));
+
+        FetchPartitionStatus fetchStatus2 = new FetchPartitionStatus(
+            new LogOffsetMetadata(fetchOffset + 100L),
+            new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100L, 
logStartOffset, maxBytes, currentLeaderEpoch));
+
+        // Set up maps for multiple partitions
+        Map<TopicIdPartition, Future<Void>> remoteFetchTasks = 
Map.of(topicIdPartition, remoteFetchTaskExpired, topicIdPartition2, 
remoteFetchTask2);
+        Map<TopicIdPartition, CompletableFuture<RemoteLogReadResult>> 
remoteFetchResults = Map.of(topicIdPartition, future1, topicIdPartition2, 
future2);
+        Map<TopicIdPartition, RemoteStorageFetchInfo> remoteFetchInfos = 
Map.of(topicIdPartition, fetchInfo1, topicIdPartition2, fetchInfo2);
+
+        DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch(
+            remoteFetchTasks,
+            remoteFetchResults,
+            remoteFetchInfos,
+            remoteFetchMaxWaitMs,
+            Map.of(topicIdPartition, fetchStatus1, topicIdPartition2, 
fetchStatus2),
+            fetchParams,
+            Map.of(topicIdPartition, logReadInfo1, topicIdPartition2, 
logReadInfo2),
+            partitionOrException,
+            callback
+        );
+
+        // Verify that the ExpiresPerSec metric is zero before fetching
+        long existingMetricVal = expiresPerSecValue();
+        // Verify the delayedRemoteFetch is not completed yet
+        assertFalse(delayedRemoteFetch.isCompleted());
+
+        // Force the delayed remote fetch to expire
+        delayedRemoteFetch.run();
+
+        // Check that the expired task was cancelled and force-completed
+        verify(remoteFetchTaskExpired).cancel(anyBoolean());
+        verify(remoteFetchTask2, never()).cancel(anyBoolean());
+        assertTrue(delayedRemoteFetch.isCompleted());
+
+        // Check that the ExpiresPerSec metric was incremented
+        assertTrue(expiresPerSecValue() > existingMetricVal);
+
+        // Fetch results should include 2 results and the expired one should 
return local read results
+        assertEquals(2, responses.size());
+        assertTrue(responses.containsKey(topicIdPartition));
+        assertTrue(responses.containsKey(topicIdPartition2));
+
+        assertEquals(Errors.NONE, responses.get(topicIdPartition).error);
+        assertEquals(highWatermark, 
responses.get(topicIdPartition).highWatermark);
+        assertEquals(leaderLogStartOffset, 
responses.get(topicIdPartition).logStartOffset);
+
+        assertEquals(Errors.NONE, responses.get(topicIdPartition2).error);
+    }
+
+    @Test
+    public void testMultiplePartitions() {
+        Map<TopicIdPartition, FetchPartitionData> responses = new HashMap<>();
+
+        Consumer<Map<TopicIdPartition, FetchPartitionData>> callback = 
responses::putAll;
+
+        // Create futures - one completed, one not
+        CompletableFuture<RemoteLogReadResult> future1 = new 
CompletableFuture<>();
+        CompletableFuture<RemoteLogReadResult> future2 = new 
CompletableFuture<>();
+        // Only complete one remote fetch
+        future1.complete(buildRemoteReadResult(Errors.NONE));
+
+        RemoteStorageFetchInfo fetchInfo1 = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null);
+        RemoteStorageFetchInfo fetchInfo2 = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null);
+
+        long highWatermark1 = 100L;
+        long leaderLogStartOffset1 = 10L;
+        long highWatermark2 = 200L;
+        long leaderLogStartOffset2 = 20L;
+
+        LogReadResult logReadInfo1 = buildReadResult(Errors.NONE, 
highWatermark1, leaderLogStartOffset1);
+        LogReadResult logReadInfo2 = buildReadResult(Errors.NONE, 
highWatermark2, leaderLogStartOffset2);
+
+        FetchPartitionStatus fetchStatus1 = new FetchPartitionStatus(
+            new LogOffsetMetadata(fetchOffset),
+            new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch));
+
+        FetchPartitionStatus fetchStatus2 = new FetchPartitionStatus(
+            new LogOffsetMetadata(fetchOffset + 100L),
+            new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100L, 
logStartOffset, maxBytes, currentLeaderEpoch));
+
+        DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch(
+            Map.of(),
+            Map.of(topicIdPartition, future1, topicIdPartition2, future2),
+            Map.of(topicIdPartition, fetchInfo1, topicIdPartition2, 
fetchInfo2),
+            remoteFetchMaxWaitMs,
+            Map.of(topicIdPartition, fetchStatus1, topicIdPartition2, 
fetchStatus2),
+            fetchParams,
+            Map.of(topicIdPartition, logReadInfo1, topicIdPartition2, 
logReadInfo2),
+            partitionOrException,
+            callback
+        );
+
+        // Should not complete since future2 is not done
+        assertFalse(delayedRemoteFetch.tryComplete());
+        assertFalse(delayedRemoteFetch.isCompleted());
+
+        // Complete future2
+        future2.complete(buildRemoteReadResult(Errors.NONE));
+
+        // Now it should complete
+        assertTrue(delayedRemoteFetch.tryComplete());
+        assertTrue(delayedRemoteFetch.isCompleted());
+
+        // Verify both partitions were processed without error
+        assertEquals(2, responses.size());
+        assertTrue(responses.containsKey(topicIdPartition));
+        assertTrue(responses.containsKey(topicIdPartition2));
+
+        assertEquals(Errors.NONE, responses.get(topicIdPartition).error);
+        assertEquals(highWatermark1, 
responses.get(topicIdPartition).highWatermark);
+        assertEquals(leaderLogStartOffset1, 
responses.get(topicIdPartition).logStartOffset);
+
+        assertEquals(Errors.NONE, responses.get(topicIdPartition2).error);
+        assertEquals(highWatermark2, 
responses.get(topicIdPartition2).highWatermark);
+        assertEquals(leaderLogStartOffset2, 
responses.get(topicIdPartition2).logStartOffset);
+    }
+
+    @Test
+    public void testMultiplePartitionsWithFailedResults() {
+        Map<TopicIdPartition, FetchPartitionData> responses = new HashMap<>();
+
+        Consumer<Map<TopicIdPartition, FetchPartitionData>> callback = 
responses::putAll;
+
+        // Create futures - one successful, one with error
+        CompletableFuture<RemoteLogReadResult> future1 = new 
CompletableFuture<>();
+        CompletableFuture<RemoteLogReadResult> future2 = new 
CompletableFuture<>();
+
+        // Created 1 successful result and 1 failed result
+        future1.complete(buildRemoteReadResult(Errors.NONE));
+        future2.complete(buildRemoteReadResult(Errors.UNKNOWN_SERVER_ERROR));
+
+        RemoteStorageFetchInfo fetchInfo1 = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null);
+        RemoteStorageFetchInfo fetchInfo2 = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null);
+
+        LogReadResult logReadInfo1 = buildReadResult(Errors.NONE, 100, 10);
+        LogReadResult logReadInfo2 = buildReadResult(Errors.NONE, 100, 10);
+
+        FetchPartitionStatus fetchStatus1 = new FetchPartitionStatus(
+            new LogOffsetMetadata(fetchOffset),
+            new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch));
+
+        FetchPartitionStatus fetchStatus2 = new FetchPartitionStatus(
+            new LogOffsetMetadata(fetchOffset + 100),
+            new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100, 
logStartOffset, maxBytes, currentLeaderEpoch));
+
+        DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch(
+            Map.of(),
+            Map.of(topicIdPartition, future1, topicIdPartition2, future2),
+            Map.of(topicIdPartition, fetchInfo1, topicIdPartition2, 
fetchInfo2),
+            remoteFetchMaxWaitMs,
+            Map.of(topicIdPartition, fetchStatus1, topicIdPartition2, 
fetchStatus2),
+            fetchParams,
+            Map.of(topicIdPartition, logReadInfo1, topicIdPartition2, 
logReadInfo2),
+            partitionOrException,
+            callback
+        );
+
+        assertTrue(delayedRemoteFetch.tryComplete());
+        assertTrue(delayedRemoteFetch.isCompleted());
+
+        // Verify both partitions were processed
+        assertEquals(2, responses.size());
+        assertTrue(responses.containsKey(topicIdPartition));
+        assertTrue(responses.containsKey(topicIdPartition2));
+
+        // First partition should be successful
+        FetchPartitionData fetchResult1 = responses.get(topicIdPartition);
+        assertEquals(Errors.NONE, fetchResult1.error);
+
+        // Second partition should have an error due to remote fetch failure
+        FetchPartitionData fetchResult2 = responses.get(topicIdPartition2);
+        assertEquals(Errors.UNKNOWN_SERVER_ERROR, fetchResult2.error);
+    }
+
+    private FetchParams buildFetchParams(int replicaId, int maxWaitMs) {
+        return new FetchParams(
+            replicaId,
+            1,
+            maxWaitMs,
+            1,
+            maxBytes,
+            FetchIsolation.LOG_END,
+            Optional.empty()
+        );
+    }
+
+    private LogReadResult buildReadResult(Errors error) {
+        return buildReadResult(error, 0, 0);
+    }
+
+    private LogReadResult buildReadResult(Errors error, long highWatermark, 
long leaderLogStartOffset) {
+        return new LogReadResult(
+            new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
MemoryRecords.EMPTY, false, Optional.empty(),
+            Optional.of(mock(RemoteStorageFetchInfo.class))),
+            Optional.empty(),
+            highWatermark,
+            leaderLogStartOffset,
+            -1L,
+            -1L,
+            -1L,
+            OptionalLong.empty(),
+            error);
+    }
+
+    private RemoteLogReadResult buildRemoteReadResult(Errors error) {
+        return new RemoteLogReadResult(
+            Optional.of(new 
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY)),
+            error != Errors.NONE ? Optional.of(error.exception()) : 
Optional.empty());
+    }
+}

Reply via email to