This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 08a93fe12ab KAFKA-14523: Move DelayedRemoteListOffsets to the storage 
module (#19285)
08a93fe12ab is described below

commit 08a93fe12ab1f29010341d47e421170b980f904c
Author: Mickael Maison <[email protected]>
AuthorDate: Sat Apr 5 13:51:13 2025 +0200

    KAFKA-14523: Move DelayedRemoteListOffsets to the storage module (#19285)
    
    Decouple RemoteLogManager and ReplicaManager.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 checkstyle/import-control-storage.xml              |   4 +
 .../java/kafka/log/remote/RemoteLogManager.java    |   2 +-
 .../kafka/server/DelayedRemoteListOffsets.scala    | 168 -------------
 core/src/main/scala/kafka/server/KafkaApis.scala   |  12 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  17 +-
 .../server/DelayedRemoteListOffsetsTest.scala      | 258 --------------------
 .../AbstractCoordinatorConcurrencyTest.scala       |   2 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala |   4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  21 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   2 +-
 .../server/purgatory/DelayedRemoteListOffsets.java | 194 +++++++++++++++
 .../purgatory}/ListOffsetsPartitionStatus.java     |   2 +-
 .../purgatory/DelayedRemoteListOffsetsTest.java    | 261 +++++++++++++++++++++
 13 files changed, 492 insertions(+), 455 deletions(-)

diff --git a/checkstyle/import-control-storage.xml 
b/checkstyle/import-control-storage.xml
index ce52e81ed0c..d96311d2897 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -49,7 +49,11 @@
 
 
     <subpackage name="server">
+        <allow pkg="com.yammer.metrics.core" />
         <allow pkg="org.apache.kafka.common" />
+        <allow pkg="org.apache.kafka.server.metrics" />
+        <allow pkg="org.apache.kafka.server.util.timer" />
+        <allow pkg="org.apache.kafka.storage.internals.log" />
 
         <subpackage name="log">
             <allow pkg="com.fasterxml.jackson" />
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index cb220baa2cd..c0da34b4d2e 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -17,7 +17,6 @@
 package kafka.log.remote;
 
 import kafka.cluster.Partition;
-import kafka.server.DelayedRemoteListOffsets;
 
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.KafkaException;
@@ -66,6 +65,7 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
 import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
+import org.apache.kafka.server.purgatory.DelayedRemoteListOffsets;
 import org.apache.kafka.server.purgatory.TopicPartitionOperationKey;
 import org.apache.kafka.server.quota.QuotaType;
 import org.apache.kafka.server.storage.log.FetchIsolation;
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala 
b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
deleted file mode 100644
index a84b78ff25c..00000000000
--- a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import com.yammer.metrics.core.Meter
-import kafka.utils.{Logging, Pool}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.ApiException
-import 
org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse,
 ListOffsetsTopicResponse}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.ListOffsetsResponse
-import org.apache.kafka.server.ListOffsetsPartitionStatus
-import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.server.purgatory.DelayedOperation
-import 
org.apache.kafka.storage.internals.log.OffsetResultHolder.FileRecordsOrError
-
-import java.util.Optional
-import java.util.concurrent.TimeUnit
-import scala.collection.{Map, mutable}
-import scala.jdk.CollectionConverters._
-
-class DelayedRemoteListOffsets(delayMs: Long,
-                               version: Int,
-                               statusByPartition: mutable.Map[TopicPartition, 
ListOffsetsPartitionStatus],
-                               replicaManager: ReplicaManager,
-                               responseCallback: 
List[ListOffsetsTopicResponse] => Unit)
-  extends DelayedOperation(delayMs) with Logging {
-  // Mark the status as completed, if there is no async task to track.
-  // If there is a task to track, then build the response as REQUEST_TIMED_OUT 
by default.
-  statusByPartition.foreachEntry { (topicPartition, status) =>
-    status.completed(status.futureHolderOpt.isEmpty)
-    if (status.futureHolderOpt.isPresent) {
-      
status.responseOpt(Optional.of(buildErrorResponse(Errors.REQUEST_TIMED_OUT, 
topicPartition.partition())))
-    }
-    trace(s"Initial partition status for $topicPartition is $status")
-  }
-
-  /**
-   * Call-back to execute when a delayed operation gets expired and hence 
forced to complete.
-   */
-  override def onExpiration(): Unit = {
-    statusByPartition.foreachEntry { (topicPartition, status) =>
-      if (!status.completed) {
-        debug(s"Expiring list offset request for partition $topicPartition 
with status $status")
-        status.futureHolderOpt.ifPresent(futureHolder => 
futureHolder.jobFuture.cancel(true))
-        DelayedRemoteListOffsetsMetrics.recordExpiration(topicPartition)
-      }
-    }
-  }
-
-  /**
-   * Process for completing an operation; This function needs to be defined
-   * in subclasses and will be called exactly once in forceComplete()
-   */
-  override def onComplete(): Unit = {
-    val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map {
-      case (topic, status) =>
-        new 
ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s 
=> Some(s.responseOpt.get())).toList.asJava)
-    }.toList
-    responseCallback(responseTopics)
-  }
-
-  /**
-   * Try to complete the delayed operation by first checking if the operation
-   * can be completed by now. If yes execute the completion logic by calling
-   * forceComplete() and return true iff forceComplete returns true; otherwise 
return false
-   *
-   * This function needs to be defined in subclasses
-   */
-  override def tryComplete(): Boolean = {
-    var completable = true
-    statusByPartition.foreachEntry { (partition, status) =>
-      if (!status.completed) {
-        try {
-          replicaManager.getPartitionOrException(partition)
-        } catch {
-          case e: ApiException =>
-            status.futureHolderOpt.ifPresent { futureHolder =>
-              futureHolder.jobFuture.cancel(false)
-              futureHolder.taskFuture.complete(new 
FileRecordsOrError(Optional.of(e), Optional.empty()))
-            }
-        }
-
-        status.futureHolderOpt.ifPresent { futureHolder =>
-          if (futureHolder.taskFuture.isDone) {
-            val taskFuture = futureHolder.taskFuture.get()
-            val response = {
-              if (taskFuture.hasException) {
-                
buildErrorResponse(Errors.forException(taskFuture.exception().get()), 
partition.partition())
-              } else if (!taskFuture.hasTimestampAndOffset) {
-                val error = status.maybeOffsetsError
-                  .map(e => if (version >= 5) Errors.forException(e) else 
Errors.LEADER_NOT_AVAILABLE)
-                  .orElse(Errors.NONE)
-                buildErrorResponse(error, partition.partition())
-              } else {
-                var partitionResponse = buildErrorResponse(Errors.NONE, 
partition.partition())
-                val found = taskFuture.timestampAndOffset().get()
-                if (status.lastFetchableOffset.isPresent && found.offset >= 
status.lastFetchableOffset.get) {
-                  if (status.maybeOffsetsError.isPresent) {
-                    val error = if (version >= 5) 
Errors.forException(status.maybeOffsetsError.get) else 
Errors.LEADER_NOT_AVAILABLE
-                    partitionResponse.setErrorCode(error.code())
-                  }
-                } else {
-                  partitionResponse = new ListOffsetsPartitionResponse()
-                    .setPartitionIndex(partition.partition())
-                    .setErrorCode(Errors.NONE.code())
-                    .setTimestamp(found.timestamp)
-                    .setOffset(found.offset)
-
-                  if (found.leaderEpoch.isPresent && version >= 4) {
-                    partitionResponse.setLeaderEpoch(found.leaderEpoch.get)
-                  }
-                }
-                partitionResponse
-              }
-            }
-            status.responseOpt(Optional.of(response))
-            status.completed(true)
-          }
-          completable = completable && futureHolder.taskFuture.isDone
-        }
-      }
-    }
-    if (completable) {
-      forceComplete()
-    } else {
-      false
-    }
-  }
-
-  private def buildErrorResponse(e: Errors, partitionIndex: Int): 
ListOffsetsPartitionResponse = {
-    new ListOffsetsPartitionResponse()
-      .setPartitionIndex(partitionIndex)
-      .setErrorCode(e.code)
-      .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
-      .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
-  }
-}
-
-object DelayedRemoteListOffsetsMetrics {
-  private val metricsGroup = new 
KafkaMetricsGroup(DelayedRemoteListOffsetsMetrics.getClass)
-  private[server] val aggregateExpirationMeter = 
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
-  private val partitionExpirationMeterFactory = (key: TopicPartition) =>
-    metricsGroup.newMeter("ExpiresPerSec",
-      "requests",
-      TimeUnit.SECONDS,
-      Map("topic" -> key.topic, "partition" -> key.partition.toString).asJava)
-  private[server] val partitionExpirationMeters = new Pool[TopicPartition, 
Meter](valueFactory = Some(partitionExpirationMeterFactory))
-
-  def recordExpiration(partition: TopicPartition): Unit = {
-    aggregateExpirationMeter.mark()
-    partitionExpirationMeters.getAndMaybePut(partition).mark()
-  }
-}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 966607f11e1..8356bc4e732 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -769,18 +769,20 @@ class KafkaApis(val requestChannel: RequestChannel,
         .setName(topic.name)
         .setPartitions(topic.partitions.asScala.map(partition =>
           buildErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, 
partition)).asJava)
-    )
+    ).asJava
 
-    def sendResponseCallback(response: Seq[ListOffsetsTopicResponse]): Unit = {
-      val mergedResponses = response ++ unauthorizedResponseStatus
+    def sendResponseCallback(response: 
util.Collection[ListOffsetsTopicResponse]): Void = {
+      val mergedResponses = new util.ArrayList(response)
+      mergedResponses.addAll(unauthorizedResponseStatus)
       requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
         new ListOffsetsResponse(new ListOffsetsResponseData()
           .setThrottleTimeMs(requestThrottleMs)
-          .setTopics(mergedResponses.asJava)))
+          .setTopics(mergedResponses)))
+      null
     }
 
     if (authorizedRequestInfo.isEmpty) {
-      sendResponseCallback(Seq.empty)
+      sendResponseCallback(util.List.of)
     } else {
       replicaManager.fetchOffset(authorizedRequestInfo, 
offsetRequest.duplicatePartitions().asScala,
         offsetRequest.isolationLevel(), offsetRequest.replicaId(), clientId, 
correlationId, version,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6ff88869d93..9b5c01a85c9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -53,13 +53,13 @@ import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, 
StopPartition, TopicOptionalIdPartition}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DeleteRecordsPartitionStatus, 
TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteListOffsets, 
DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, 
TopicPartitionOperationKey}
 import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, 
DelayedShareFetchPartitionKey}
 import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
 import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
-import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, 
ListOffsetsPartitionStatus, common}
+import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
 import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, 
OffsetCheckpointFile, OffsetCheckpoints}
-import org.apache.kafka.storage.internals.log._
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, 
LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, 
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
 import java.io.File
@@ -70,6 +70,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.Lock
 import java.util.concurrent.{CompletableFuture, Future, 
RejectedExecutionException, TimeUnit}
 import java.util.{Collections, Optional, OptionalInt, OptionalLong}
+import java.util.function.Consumer
 import scala.collection.{Map, Seq, Set, immutable, mutable}
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters.{RichOption, RichOptional}
@@ -841,7 +842,7 @@ class ReplicaManager(val config: KafkaConfig,
     )
 
     val retryTimeoutMs = 
Math.min(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(), 
config.requestTimeoutMs)
-    val addPartitionsRetryBackoffMs = 
config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs
+    val addPartitionsRetryBackoffMs = 
config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs()
     val startVerificationTimeMs = time.milliseconds
     def maybeRetryOnConcurrentTransactions(results: (Map[TopicPartition, 
Errors], Map[TopicPartition, VerificationGuard])): Unit = {
       if (time.milliseconds() - startVerificationTimeMs >= retryTimeoutMs) {
@@ -1470,7 +1471,7 @@ class ReplicaManager(val config: KafkaConfig,
                   correlationId: Int,
                   version: Short,
                   buildErrorResponse: (Errors, ListOffsetsPartition) => 
ListOffsetsPartitionResponse,
-                  responseCallback: List[ListOffsetsTopicResponse] => Unit,
+                  responseCallback: 
Consumer[util.Collection[ListOffsetsTopicResponse]],
                   timeoutMs: Int = 0): Unit = {
     val statusByPartition = mutable.Map[TopicPartition, 
ListOffsetsPartitionStatus]()
     topics.foreach { topic =>
@@ -1569,7 +1570,7 @@ class ReplicaManager(val config: KafkaConfig,
     if (delayedRemoteListOffsetsRequired(statusByPartition)) {
       val delayMs: Long = if (timeoutMs > 0) timeoutMs else 
config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs()
       // create delayed remote list offsets operation
-      val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 
version, statusByPartition, this, responseCallback)
+      val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 
version, statusByPartition.asJava, tp => getPartitionOrException(tp), 
responseCallback)
       // create a list of (topic, partition) pairs to use as keys for this 
delayed remote list offsets operation
       val listOffsetsRequestKeys = statusByPartition.keys.map(new 
TopicPartitionOperationKey(_)).toList
       // try to complete the request immediately, otherwise put it into the 
purgatory
@@ -1580,7 +1581,7 @@ class ReplicaManager(val config: KafkaConfig,
         case (topic, status) =>
           new 
ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s 
=> Some(s.responseOpt.get())).toList.asJava)
       }.toList
-      responseCallback(responseTopics)
+      responseCallback.accept(responseTopics.asJava)
     }
   }
 
@@ -1899,7 +1900,7 @@ class ReplicaManager(val config: KafkaConfig,
         createLogReadResult(highWatermark, leaderLogStartOffset, 
leaderLogEndOffset,
           new OffsetMovedToTieredStorageException("Given offset" + offset + " 
is moved to tiered storage"))
       } else {
-        val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMs()
+        val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMs
         val fetchDataInfo = if (throttleTimeMs > 0) {
           // Record the throttle time for the remote log fetches
           
remoteLogManager.get.fetchThrottleTimeSensor().record(throttleTimeMs, 
time.milliseconds())
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
 
b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
deleted file mode 100644
index f40338d3264..00000000000
--- 
a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.NotLeaderOrFollowerException
-import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
-import org.apache.kafka.common.requests.ListOffsetsResponse
-import org.apache.kafka.server.ListOffsetsPartitionStatus
-import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, 
TopicPartitionOperationKey}
-import org.apache.kafka.server.util.timer.MockTimer
-import org.apache.kafka.storage.internals.log.{AsyncOffsetReadFutureHolder, 
OffsetResultHolder}
-import org.junit.jupiter.api.{AfterEach, Test}
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.mockito.ArgumentMatchers.anyBoolean
-import org.mockito.Mockito.{mock, when}
-
-import java.util.Optional
-import java.util.concurrent.CompletableFuture
-import scala.collection.mutable
-import scala.concurrent.TimeoutException
-import scala.jdk.CollectionConverters._
-
-class DelayedRemoteListOffsetsTest {
-
-  val delayMs = 10
-  val timer = new MockTimer()
-  val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
-  type T = OffsetResultHolder.FileRecordsOrError
-  val purgatory =
-    new DelayedOperationPurgatory[DelayedRemoteListOffsets]("test-purgatory", 
timer, 0, 10, true, true)
-
-  @AfterEach
-  def afterEach(): Unit = {
-    purgatory.shutdown()
-  }
-
-  @Test
-  def testResponseOnRequestExpiration(): Unit = {
-    var numResponse = 0
-    val responseCallback = (response: List[ListOffsetsTopicResponse]) => {
-      response.foreach { topic =>
-        topic.partitions().forEach { partition =>
-          assertEquals(Errors.REQUEST_TIMED_OUT.code(), partition.errorCode())
-          assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 
partition.timestamp())
-          assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, partition.offset())
-          assertEquals(-1, partition.leaderEpoch())
-          numResponse += 1
-        }
-      }
-    }
-
-    var cancelledCount = 0
-    val jobFuture = mock(classOf[CompletableFuture[Void]])
-    val holder: AsyncOffsetReadFutureHolder[T] = 
mock(classOf[AsyncOffsetReadFutureHolder[T]])
-    when(holder.taskFuture).thenAnswer(_ => new CompletableFuture[T]())
-    when(holder.jobFuture).thenReturn(jobFuture)
-    when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => {
-      cancelledCount += 1
-      true
-    })
-
-    val statusByPartition = mutable.Map(
-      new TopicPartition("test", 0) -> 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
 
-      new TopicPartition("test", 1) -> 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
-      new TopicPartition("test1", 0) -> 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
-    )
-
-    val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 
version = 5, statusByPartition, replicaManager, responseCallback)
-    val listOffsetsRequestKeys = statusByPartition.keys.map(new 
TopicPartitionOperationKey(_)).toList.asJava
-    assertEquals(0, 
DelayedRemoteListOffsetsMetrics.aggregateExpirationMeter.count())
-    assertEquals(0, 
DelayedRemoteListOffsetsMetrics.partitionExpirationMeters.size)
-    purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, 
listOffsetsRequestKeys)
-
-    Thread.sleep(100)
-    assertEquals(3, listOffsetsRequestKeys.size)
-    assertEquals(listOffsetsRequestKeys.size, cancelledCount)
-    assertEquals(listOffsetsRequestKeys.size, numResponse)
-    assertEquals(listOffsetsRequestKeys.size, 
DelayedRemoteListOffsetsMetrics.aggregateExpirationMeter.count())
-    listOffsetsRequestKeys.forEach(key => {
-      val tp = new TopicPartition(key.topic, key.partition)
-      assertEquals(1, 
DelayedRemoteListOffsetsMetrics.partitionExpirationMeters.get(tp).count())
-    })
-  }
-
-  @Test
-  def testResponseOnSuccess(): Unit = {
-    var numResponse = 0
-    val responseCallback = (response: List[ListOffsetsTopicResponse]) => {
-      response.foreach { topic =>
-        topic.partitions().forEach { partition =>
-          assertEquals(Errors.NONE.code(), partition.errorCode())
-          assertEquals(100L, partition.timestamp())
-          assertEquals(100L, partition.offset())
-          assertEquals(50, partition.leaderEpoch())
-          numResponse += 1
-        }
-      }
-    }
-
-    val timestampAndOffset = new TimestampAndOffset(100L, 100L, 
Optional.of(50))
-    val taskFuture = new CompletableFuture[T]()
-    taskFuture.complete(new 
OffsetResultHolder.FileRecordsOrError(Optional.empty(), 
Optional.of(timestampAndOffset)))
-
-    var cancelledCount = 0
-    val jobFuture = mock(classOf[CompletableFuture[Void]])
-    val holder: AsyncOffsetReadFutureHolder[T] = 
mock(classOf[AsyncOffsetReadFutureHolder[T]])
-    when(holder.taskFuture).thenAnswer(_ => taskFuture)
-    when(holder.jobFuture).thenReturn(jobFuture)
-    when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => {
-      cancelledCount += 1
-      true
-    })
-
-    val statusByPartition = mutable.Map(
-      new TopicPartition("test", 0) -> 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
-      new TopicPartition("test", 1) -> 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
-      new TopicPartition("test1", 0) -> 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build()
-    )
-
-    val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 
version = 5, statusByPartition, replicaManager, responseCallback)
-    val listOffsetsRequestKeys = statusByPartition.keys.map(new 
TopicPartitionOperationKey(_)).toList.asJava
-    purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, 
listOffsetsRequestKeys)
-
-    assertEquals(0, cancelledCount)
-    assertEquals(listOffsetsRequestKeys.size, numResponse)
-  }
-
-  @Test
-  def testResponseOnPartialError(): Unit = {
-    var numResponse = 0
-    val responseCallback = (response: List[ListOffsetsTopicResponse]) => {
-      response.foreach { topic =>
-        topic.partitions().forEach { partition =>
-          if (topic.name().equals("test1")) {
-            assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), 
partition.errorCode())
-            assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 
partition.timestamp())
-            assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, 
partition.offset())
-            assertEquals(-1, partition.leaderEpoch())
-          } else {
-            assertEquals(Errors.NONE.code(), partition.errorCode())
-            assertEquals(100L, partition.timestamp())
-            assertEquals(100L, partition.offset())
-            assertEquals(50, partition.leaderEpoch())
-          }
-          numResponse += 1
-        }
-      }
-    }
-
-    val timestampAndOffset = new TimestampAndOffset(100L, 100L, 
Optional.of(50))
-    val taskFuture = new CompletableFuture[T]()
-    taskFuture.complete(new 
OffsetResultHolder.FileRecordsOrError(Optional.empty(), 
Optional.of(timestampAndOffset)))
-
-    var cancelledCount = 0
-    val jobFuture = mock(classOf[CompletableFuture[Void]])
-    val holder: AsyncOffsetReadFutureHolder[T] = 
mock(classOf[AsyncOffsetReadFutureHolder[T]])
-    when(holder.taskFuture).thenAnswer(_ => taskFuture)
-    when(holder.jobFuture).thenReturn(jobFuture)
-    when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => {
-      cancelledCount += 1
-      true
-    })
-
-    val errorFutureHolder: AsyncOffsetReadFutureHolder[T] = 
mock(classOf[AsyncOffsetReadFutureHolder[T]])
-    val errorTaskFuture = new CompletableFuture[T]()
-    errorTaskFuture.complete(new 
OffsetResultHolder.FileRecordsOrError(Optional.of(new TimeoutException("Timed 
out!")), Optional.empty()))
-    when(errorFutureHolder.taskFuture).thenAnswer(_ => errorTaskFuture)
-    when(errorFutureHolder.jobFuture).thenReturn(jobFuture)
-
-    val statusByPartition = mutable.Map(
-      new TopicPartition("test", 0) -> 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
-      new TopicPartition("test", 1) -> 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
-      new TopicPartition("test1", 0) -> 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build()
 
-    )
-
-    val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 
version = 5, statusByPartition, replicaManager, responseCallback)
-    val listOffsetsRequestKeys = statusByPartition.keys.map(new 
TopicPartitionOperationKey(_)).toList.asJava
-    purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, 
listOffsetsRequestKeys)
-
-    assertEquals(0, cancelledCount)
-    assertEquals(listOffsetsRequestKeys.size, numResponse)
-  }
-
-  @Test
-  def testPartialResponseWhenNotLeaderOrFollowerExceptionOnOnePartition(): 
Unit = {
-    var numResponse = 0
-    val responseCallback = (response: List[ListOffsetsTopicResponse]) => {
-      response.foreach { topic =>
-        topic.partitions().forEach { partition =>
-          if (topic.name().equals("test1") && partition.partitionIndex() == 0) 
{
-            assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code(), 
partition.errorCode())
-            assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 
partition.timestamp())
-            assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, 
partition.offset())
-            assertEquals(-1, partition.leaderEpoch())
-          } else {
-            assertEquals(Errors.NONE.code(), partition.errorCode())
-            assertEquals(100L, partition.timestamp())
-            assertEquals(100L, partition.offset())
-            assertEquals(50, partition.leaderEpoch())
-          }
-          numResponse += 1
-        }
-      }
-    }
-
-    val timestampAndOffset = new TimestampAndOffset(100L, 100L, 
Optional.of(50))
-    val taskFuture = new CompletableFuture[T]()
-    taskFuture.complete(new 
OffsetResultHolder.FileRecordsOrError(Optional.empty(), 
Optional.of(timestampAndOffset)))
-
-    var cancelledCount = 0
-    val jobFuture = mock(classOf[CompletableFuture[Void]])
-    val holder: AsyncOffsetReadFutureHolder[T] = 
mock(classOf[AsyncOffsetReadFutureHolder[T]])
-    when(holder.taskFuture).thenAnswer(_ => taskFuture)
-    when(holder.jobFuture).thenReturn(jobFuture)
-    when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => {
-      cancelledCount += 1
-      true
-    })
-
-    when(replicaManager.getPartitionOrException(new TopicPartition("test1", 
0)))
-      .thenThrow(new NotLeaderOrFollowerException("Not leader or follower!"))
-    val errorFutureHolder: AsyncOffsetReadFutureHolder[T] = 
mock(classOf[AsyncOffsetReadFutureHolder[T]])
-    val errorTaskFuture = new CompletableFuture[T]()
-    when(errorFutureHolder.taskFuture).thenAnswer(_ => errorTaskFuture)
-    when(errorFutureHolder.jobFuture).thenReturn(jobFuture)
-
-    val statusByPartition = mutable.Map(
-      new TopicPartition("test", 0) -> 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
-      new TopicPartition("test", 1) -> 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
-      new TopicPartition("test1", 0) -> 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build(),
 
-      new TopicPartition("test1", 1) -> 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build()
-    )
-
-    val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 
version = 5, statusByPartition, replicaManager, responseCallback)
-    val listOffsetsRequestKeys = statusByPartition.keys.map(new 
TopicPartitionOperationKey(_)).toList.asJava
-    purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, 
listOffsetsRequestKeys)
-
-    assertEquals(1, cancelledCount)
-    assertEquals(listOffsetsRequestKeys.size, numResponse)
-  }
-}
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index a3f9354fd8b..9149fec2eda 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.record.{MemoryRecords, 
RecordBatch, RecordValidat
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.server.common.RequestLocal
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteListOffsets, TopicPartitionOperationKey}
 import org.apache.kafka.server.util.timer.{MockTimer, Timer}
 import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
 import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
UnifiedLog, VerificationGuard}
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index a94de9a14d8..257f777885b 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -18,7 +18,7 @@
 package kafka.log
 
 import kafka.log.remote.RemoteLogManager
-import kafka.server.{DelayedRemoteListOffsets, KafkaConfig}
+import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
@@ -38,7 +38,7 @@ import org.apache.kafka.server.config.KRaftConfigs
 import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
+import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, 
DelayedRemoteListOffsets}
 import org.apache.kafka.server.storage.log.{FetchIsolation, 
UnexpectedAppendOffsetException}
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
 import 
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, 
PartitionMetadataFile}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index cf400517c25..e490be540e2 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -114,6 +114,7 @@ import java.time.Duration
 import java.util
 import java.util.Arrays.asList
 import java.util.concurrent.{CompletableFuture, TimeUnit}
+import java.util.function.Consumer
 import java.util.{Collections, Comparator, Optional, OptionalInt, 
OptionalLong, Properties}
 import scala.collection.{Map, Seq, mutable}
 import scala.jdk.CollectionConverters._
@@ -3396,16 +3397,16 @@ class KafkaApisTest extends Logging {
       ArgumentMatchers.anyInt(), // correlationId
       ArgumentMatchers.anyShort(), // version
       ArgumentMatchers.any[(Errors, ListOffsetsPartition) => 
ListOffsetsPartitionResponse](),
-      ArgumentMatchers.any[List[ListOffsetsTopicResponse] => Unit](),
+      
ArgumentMatchers.any[Consumer[util.Collection[ListOffsetsTopicResponse]]],
       ArgumentMatchers.anyInt() // timeoutMs
     )).thenAnswer(ans => {
-      val callback = ans.getArgument[List[ListOffsetsTopicResponse] => Unit](8)
+      val callback = 
ans.getArgument[Consumer[util.List[ListOffsetsTopicResponse]]](8)
       val partitionResponse = new ListOffsetsPartitionResponse()
         .setErrorCode(error.code())
         .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
         .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
         .setPartitionIndex(tp.partition())
-      callback(List(new 
ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
+      callback.accept(util.List.of(new 
ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
     })
 
     val targetTimes = List(new ListOffsetsTopic()
@@ -3503,7 +3504,7 @@ class KafkaApisTest extends Logging {
 
     // 2 topics returned for authorization in during handle
     val topicsReturnedFromMetadataCacheForAuthorization = 
util.Set.of("remaining-topic", "later-deleted-topic")
-    
when(metadataCache.getAllTopics()).thenReturn(topicsReturnedFromMetadataCacheForAuthorization)
+    
when(metadataCache.getAllTopics).thenReturn(topicsReturnedFromMetadataCacheForAuthorization)
     // 1 topic is deleted from metadata right at the time between 
authorization and the next getTopicMetadata() call
     when(metadataCache.getTopicMetadata(
       ArgumentMatchers.eq(topicsReturnedFromMetadataCacheForAuthorization),
@@ -8857,11 +8858,11 @@ class KafkaApisTest extends Logging {
       ArgumentMatchers.anyInt(), // correlationId
       ArgumentMatchers.anyShort(), // version
       ArgumentMatchers.any[(Errors, ListOffsetsPartition) => 
ListOffsetsPartitionResponse](),
-      ArgumentMatchers.any[List[ListOffsetsTopicResponse] => Unit](),
+      
ArgumentMatchers.any[Consumer[util.Collection[ListOffsetsTopicResponse]]],
       ArgumentMatchers.anyInt() // timeoutMs
     )).thenAnswer(ans => {
       val version = ans.getArgument[Short](6)
-      val callback = ans.getArgument[List[ListOffsetsTopicResponse] => Unit](8)
+      val callback = 
ans.getArgument[Consumer[util.List[ListOffsetsTopicResponse]]](8)
       val errorCode = if 
(ReplicaManager.isListOffsetsTimestampUnsupported(timestamp, version))
         Errors.UNSUPPORTED_VERSION.code()
       else
@@ -8871,7 +8872,7 @@ class KafkaApisTest extends Logging {
         .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
         .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
         .setPartitionIndex(tp.partition())
-      callback(List(new 
ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
+      callback.accept(util.List.of(new 
ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
     })
 
     val data = new 
ListOffsetsRequestData().setTopics(targetTimes).setReplicaId(ListOffsetsRequest.CONSUMER_REPLICA_ID)
@@ -8909,16 +8910,16 @@ class KafkaApisTest extends Logging {
       ArgumentMatchers.anyInt(), // correlationId
       ArgumentMatchers.anyShort(), // version
       ArgumentMatchers.any[(Errors, ListOffsetsPartition) => 
ListOffsetsPartitionResponse](),
-      ArgumentMatchers.any[List[ListOffsetsTopicResponse] => Unit](),
+      
ArgumentMatchers.any[Consumer[util.Collection[ListOffsetsTopicResponse]]],
       ArgumentMatchers.anyInt() // timeoutMs
     )).thenAnswer(ans => {
-      val callback = ans.getArgument[List[ListOffsetsTopicResponse] => Unit](8)
+      val callback = 
ans.getArgument[Consumer[util.List[ListOffsetsTopicResponse]]](8)
       val partitionResponse = new ListOffsetsPartitionResponse()
         .setErrorCode(Errors.NONE.code())
         .setOffset(latestOffset)
         .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
         .setPartitionIndex(tp.partition())
-      callback(List(new 
ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
+      callback.accept(util.List.of(new 
ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
     })
 
     val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, 
isolationLevel)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e014f255d40..5ae1262df40 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -63,7 +63,7 @@ import org.apache.kafka.server.config.{KRaftConfigs, 
ReplicationConfigs, ServerL
 import org.apache.kafka.server.log.remote.storage._
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedRemoteListOffsets}
 import org.apache.kafka.server.share.SharePartitionKey
 import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, 
DelayedShareFetchKey, ShareFetch}
 import org.apache.kafka.server.share.metrics.ShareGroupMetrics
diff --git 
a/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
 
b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
new file mode 100644
index 00000000000..200fb2262ac
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.log.OffsetResultHolder;
+
+import com.yammer.metrics.core.Meter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class DelayedRemoteListOffsets extends DelayedOperation {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DelayedRemoteListOffsets.class);
+
+    // For compatibility, metrics are defined to be under 
`kafka.server.DelayedRemoteListOffsetsMetrics` class
+    private static final KafkaMetricsGroup METRICS_GROUP = new 
KafkaMetricsGroup("kafka.server", "DelayedRemoteListOffsetsMetrics");
+    static final Meter AGGREGATE_EXPIRATION_METER = 
METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS);
+    static final Map<TopicPartition, Meter> PARTITION_EXPIRATION_METERS = new 
ConcurrentHashMap<>();
+
+    private final int version;
+    private final Map<TopicPartition, ListOffsetsPartitionStatus> 
statusByPartition;
+    private final Consumer<TopicPartition> partitionOrException;
+    private final 
Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>> 
responseCallback;
+
+    public DelayedRemoteListOffsets(long delayMs,
+                                    int version,
+                                    Map<TopicPartition, 
ListOffsetsPartitionStatus> statusByPartition,
+                                    Consumer<TopicPartition> 
partitionOrException,
+                                    
Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>> 
responseCallback) {
+        super(delayMs);
+        this.version = version;
+        this.statusByPartition = statusByPartition;
+        this.partitionOrException = partitionOrException;
+        this.responseCallback = responseCallback;
+        // Mark the status as completed, if there is no async task to track.
+        // If there is a task to track, then build the response as 
REQUEST_TIMED_OUT by default.
+        statusByPartition.forEach((topicPartition, status) -> {
+            status.completed(status.futureHolderOpt().isEmpty());
+            if (status.futureHolderOpt().isPresent()) {
+                
status.responseOpt(Optional.of(buildErrorResponse(Errors.REQUEST_TIMED_OUT, 
topicPartition.partition())));
+            }
+            LOG.trace("Initial partition status for {} is {}", topicPartition, 
status);
+        });
+    }
+
+    /**
+     * Call-back to execute when a delayed operation gets expired and hence 
forced to complete.
+     */
+    @Override
+    public void onExpiration() {
+        statusByPartition.forEach((topicPartition, status) -> {
+            if (!status.completed()) {
+                LOG.debug("Expiring list offset request for partition {} with 
status {}", topicPartition, status);
+                status.futureHolderOpt().ifPresent(futureHolder -> 
futureHolder.jobFuture().cancel(true));
+                recordExpiration(topicPartition);
+            }
+        });
+    }
+
+    /**
+     * Process for completing an operation; This function needs to be defined
+     * in subclasses and will be called exactly once in forceComplete()
+     */
+    @Override
+    public void onComplete() {
+        Map<String, ListOffsetsResponseData.ListOffsetsTopicResponse> 
groupedByTopic = new HashMap<>();
+        statusByPartition.forEach((tp, status) -> {
+            ListOffsetsResponseData.ListOffsetsTopicResponse response = 
groupedByTopic.computeIfAbsent(tp.topic(), k ->
+                    new 
ListOffsetsResponseData.ListOffsetsTopicResponse().setName(tp.topic()));
+            status.responseOpt().ifPresent(res -> 
response.partitions().add(res));
+        });
+        responseCallback.accept(groupedByTopic.values());
+    }
+
+    /**
+     * Try to complete the delayed operation by first checking if the operation
+     * can be completed by now. If yes execute the completion logic by calling
+     * forceComplete() and return true iff forceComplete returns true; 
otherwise return false
+     */
+    @Override
+    public boolean tryComplete() {
+        AtomicBoolean completable = new AtomicBoolean(true);
+        statusByPartition.forEach((partition, status) -> {
+            if (!status.completed()) {
+                try {
+                    partitionOrException.accept(partition);
+                } catch (ApiException e) {
+                    status.futureHolderOpt().ifPresent(futureHolder -> {
+                        futureHolder.jobFuture().cancel(false);
+                        futureHolder.taskFuture().complete(new 
OffsetResultHolder.FileRecordsOrError(Optional.of(e), Optional.empty()));
+                    });
+                }
+
+                status.futureHolderOpt().ifPresent(futureHolder -> {
+                    if (futureHolder.taskFuture().isDone()) {
+                        ListOffsetsResponseData.ListOffsetsPartitionResponse 
response;
+                        try {
+                            OffsetResultHolder.FileRecordsOrError taskFuture = 
futureHolder.taskFuture().get();
+                            if (taskFuture.hasException()) {
+                                response = 
buildErrorResponse(Errors.forException(taskFuture.exception().get()), 
partition.partition());
+                            } else if (!taskFuture.hasTimestampAndOffset()) {
+                                Errors error = status.maybeOffsetsError()
+                                        .map(e -> version >= 5 ? 
Errors.forException(e) : Errors.LEADER_NOT_AVAILABLE)
+                                        .orElse(Errors.NONE);
+                                response = buildErrorResponse(error, 
partition.partition());
+                            } else {
+                                
ListOffsetsResponseData.ListOffsetsPartitionResponse partitionResponse = 
buildErrorResponse(Errors.NONE, partition.partition());
+                                FileRecords.TimestampAndOffset found = 
taskFuture.timestampAndOffset().get();
+                                if (status.lastFetchableOffset().isPresent() 
&& found.offset >= status.lastFetchableOffset().get()) {
+                                    if 
(status.maybeOffsetsError().isPresent()) {
+                                        Errors error = version >= 5 ? 
Errors.forException(status.maybeOffsetsError().get()) : 
Errors.LEADER_NOT_AVAILABLE;
+                                        
partitionResponse.setErrorCode(error.code());
+                                    }
+                                } else {
+                                    partitionResponse = new 
ListOffsetsResponseData.ListOffsetsPartitionResponse()
+                                            
.setPartitionIndex(partition.partition())
+                                            .setErrorCode(Errors.NONE.code())
+                                            .setTimestamp(found.timestamp)
+                                            .setOffset(found.offset);
+
+                                    if (found.leaderEpoch.isPresent() && 
version >= 4) {
+                                        
partitionResponse.setLeaderEpoch(found.leaderEpoch.get());
+                                    }
+                                }
+                                response = partitionResponse;
+                            }
+                        } catch (InterruptedException | ExecutionException e) {
+                            response = 
buildErrorResponse(Errors.forException(e), partition.partition());
+                        }
+                        status.responseOpt(Optional.of(response));
+                        status.completed(true);
+                    }
+                    completable.set(completable.get() && 
futureHolder.taskFuture().isDone());
+                });
+            }
+        });
+        if (completable.get()) {
+            return forceComplete();
+        } else {
+            return false;
+        }
+    }
+
+    private ListOffsetsResponseData.ListOffsetsPartitionResponse 
buildErrorResponse(Errors e, int partitionIndex) {
+        return new ListOffsetsResponseData.ListOffsetsPartitionResponse()
+                        .setPartitionIndex(partitionIndex)
+                        .setErrorCode(e.code())
+                        .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
+                        .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET);
+    }
+
+    private static void recordExpiration(TopicPartition partition) {
+        AGGREGATE_EXPIRATION_METER.mark();
+        PARTITION_EXPIRATION_METERS.computeIfAbsent(partition, tp -> 
METRICS_GROUP.newMeter("ExpiresPerSec",
+                "requests",
+                TimeUnit.SECONDS,
+                mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", 
String.valueOf(tp.partition()))))).mark();
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/ListOffsetsPartitionStatus.java 
b/storage/src/main/java/org/apache/kafka/server/purgatory/ListOffsetsPartitionStatus.java
similarity index 99%
rename from 
server/src/main/java/org/apache/kafka/server/ListOffsetsPartitionStatus.java
rename to 
storage/src/main/java/org/apache/kafka/server/purgatory/ListOffsetsPartitionStatus.java
index b489b820ef0..74623ee8026 100644
--- 
a/server/src/main/java/org/apache/kafka/server/ListOffsetsPartitionStatus.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/purgatory/ListOffsetsPartitionStatus.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.server;
+package org.apache.kafka.server.purgatory;
 
 import org.apache.kafka.common.errors.ApiException;
 import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
diff --git 
a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
 
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
new file mode 100644
index 00000000000..81b9073377a
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.server.util.timer.MockTimer;
+import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
+import org.apache.kafka.storage.internals.log.OffsetResultHolder;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("unchecked")
+public class DelayedRemoteListOffsetsTest {
+
+    private final int delayMs = 10;
+    private final MockTimer timer = new MockTimer();
+    private final Consumer<TopicPartition> partitionOrException = 
mock(Consumer.class);
+    private final DelayedOperationPurgatory<DelayedRemoteListOffsets> 
purgatory =
+            new DelayedOperationPurgatory<>("test-purgatory", timer, 0, 10, 
true, true);
+
+    @AfterEach
+    public void afterEach() throws Exception {
+        purgatory.shutdown();
+    }
+
+    @Test
+    public void testResponseOnRequestExpiration() throws InterruptedException {
+        AtomicInteger numResponse = new AtomicInteger(0);
+        Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>> 
responseCallback = response ->
+            response.forEach(topic ->
+                topic.partitions().forEach(partition -> {
+                    assertEquals(Errors.REQUEST_TIMED_OUT.code(), 
partition.errorCode());
+                    assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 
partition.timestamp());
+                    assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, 
partition.offset());
+                    assertEquals(-1, partition.leaderEpoch());
+                    numResponse.incrementAndGet();
+                })
+            );
+
+        AtomicInteger cancelledCount = new AtomicInteger(0);
+        CompletableFuture<Void> jobFuture = mock(CompletableFuture.class);
+        AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> 
holder = mock(AsyncOffsetReadFutureHolder.class);
+        when(holder.taskFuture()).thenAnswer(f -> new CompletableFuture<>());
+        when(holder.jobFuture()).thenReturn(jobFuture);
+        when(jobFuture.cancel(anyBoolean())).thenAnswer(f -> {
+            cancelledCount.incrementAndGet();
+            return true;
+        });
+
+        Map<TopicPartition, ListOffsetsPartitionStatus> statusByPartition = 
Map.of(
+            new TopicPartition("test", 0), 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+            new TopicPartition("test", 1), 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+            new TopicPartition("test1", 0), 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build()
+        );
+
+        DelayedRemoteListOffsets delayedRemoteListOffsets = new 
DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException, 
responseCallback);
+        List<TopicPartitionOperationKey> listOffsetsRequestKeys = 
statusByPartition.keySet().stream().map(TopicPartitionOperationKey::new).toList();
+        assertEquals(0, 
DelayedRemoteListOffsets.AGGREGATE_EXPIRATION_METER.count());
+        assertEquals(0, 
DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.size());
+        purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, 
listOffsetsRequestKeys);
+
+        Thread.sleep(100);
+        assertEquals(3, listOffsetsRequestKeys.size());
+        assertEquals(cancelledCount.get(), listOffsetsRequestKeys.size());
+        assertEquals(numResponse.get(), listOffsetsRequestKeys.size());
+        assertEquals(listOffsetsRequestKeys.size(), 
DelayedRemoteListOffsets.AGGREGATE_EXPIRATION_METER.count());
+        listOffsetsRequestKeys.forEach(key -> {
+            TopicPartition tp = new TopicPartition(key.topic, key.partition);
+            assertEquals(1, 
DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.get(tp).count());
+        });
+    }
+
+    @Test
+    public void testResponseOnSuccess() {
+        AtomicInteger numResponse = new AtomicInteger(0);
+        Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>> 
responseCallback = response ->
+            response.forEach(topic ->
+                topic.partitions().forEach(partition -> {
+                    assertEquals(Errors.NONE.code(), partition.errorCode());
+                    assertEquals(100L, partition.timestamp());
+                    assertEquals(100L, partition.offset());
+                    assertEquals(50, partition.leaderEpoch());
+                    numResponse.incrementAndGet();
+                })
+            );
+
+        FileRecords.TimestampAndOffset timestampAndOffset = new 
FileRecords.TimestampAndOffset(100L, 100L, Optional.of(50));
+        CompletableFuture<OffsetResultHolder.FileRecordsOrError> taskFuture = 
new CompletableFuture<>();
+        taskFuture.complete(new 
OffsetResultHolder.FileRecordsOrError(Optional.empty(), 
Optional.of(timestampAndOffset)));
+
+        AtomicInteger cancelledCount = new AtomicInteger(0);
+        CompletableFuture<Void> jobFuture = mock(CompletableFuture.class);
+        AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> 
holder = mock(AsyncOffsetReadFutureHolder.class);
+        when(holder.taskFuture()).thenAnswer(f -> taskFuture);
+        when(holder.jobFuture()).thenReturn(jobFuture);
+        when(jobFuture.cancel(anyBoolean())).thenAnswer(f -> {
+            cancelledCount.incrementAndGet();
+            return true;
+        });
+
+        Map<TopicPartition, ListOffsetsPartitionStatus> statusByPartition = 
Map.of(
+            new TopicPartition("test", 0), 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+            new TopicPartition("test", 1), 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+            new TopicPartition("test1", 0), 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build()
+        );
+
+        DelayedRemoteListOffsets delayedRemoteListOffsets = new 
DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException, 
responseCallback);
+        List<TopicPartitionOperationKey> listOffsetsRequestKeys = 
statusByPartition.keySet().stream().map(TopicPartitionOperationKey::new).toList();
+        purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, 
listOffsetsRequestKeys);
+
+        assertEquals(0, cancelledCount.get());
+        assertEquals(numResponse.get(), listOffsetsRequestKeys.size());
+    }
+
+    @Test
+    public void testResponseOnPartialError() {
+        AtomicInteger numResponse = new AtomicInteger(0);
+        Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>> 
responseCallback = response ->
+            response.forEach(topic ->
+                topic.partitions().forEach(partition -> {
+                    if (topic.name().equals("test1")) {
+                        assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), 
partition.errorCode());
+                        assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 
partition.timestamp());
+                        assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, 
partition.offset());
+                        assertEquals(-1, partition.leaderEpoch());
+                    } else {
+                        assertEquals(Errors.NONE.code(), 
partition.errorCode());
+                        assertEquals(100L, partition.timestamp());
+                        assertEquals(100L, partition.offset());
+                        assertEquals(50, partition.leaderEpoch());
+                    }
+                    numResponse.incrementAndGet();
+                })
+            );
+
+        FileRecords.TimestampAndOffset timestampAndOffset = new 
FileRecords.TimestampAndOffset(100L, 100L, Optional.of(50));
+        CompletableFuture<OffsetResultHolder.FileRecordsOrError> taskFuture = 
new CompletableFuture<>();
+        taskFuture.complete(new 
OffsetResultHolder.FileRecordsOrError(Optional.empty(), 
Optional.of(timestampAndOffset)));
+
+        AtomicInteger cancelledCount = new AtomicInteger(0);
+        CompletableFuture<Void> jobFuture = mock(CompletableFuture.class);
+        AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> 
holder = mock(AsyncOffsetReadFutureHolder.class);
+        when(holder.taskFuture()).thenAnswer(f -> taskFuture);
+        when(holder.jobFuture()).thenReturn(jobFuture);
+        when(jobFuture.cancel(anyBoolean())).thenAnswer(f -> {
+            cancelledCount.incrementAndGet();
+            return true;
+        });
+
+        AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> 
errorFutureHolder = mock(AsyncOffsetReadFutureHolder.class);
+        CompletableFuture<OffsetResultHolder.FileRecordsOrError> 
errorTaskFuture = new CompletableFuture<>();
+        errorTaskFuture.complete(new 
OffsetResultHolder.FileRecordsOrError(Optional.of(new TimeoutException("Timed 
out!")), Optional.empty()));
+        when(errorFutureHolder.taskFuture()).thenAnswer(f -> errorTaskFuture);
+        when(errorFutureHolder.jobFuture()).thenReturn(jobFuture);
+
+        Map<TopicPartition, ListOffsetsPartitionStatus> statusByPartition = 
Map.of(
+            new TopicPartition("test", 0), 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+            new TopicPartition("test", 1), 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+            new TopicPartition("test1", 0), 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build()
+        );
+
+        DelayedRemoteListOffsets delayedRemoteListOffsets = new 
DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException, 
responseCallback);
+        List<TopicPartitionOperationKey> listOffsetsRequestKeys = 
statusByPartition.keySet().stream().map(TopicPartitionOperationKey::new).toList();
+        purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, 
listOffsetsRequestKeys);
+
+        assertEquals(0, cancelledCount.get());
+        assertEquals(numResponse.get(), listOffsetsRequestKeys.size());
+    }
+
+    @Test
+    public void 
testPartialResponseWhenNotLeaderOrFollowerExceptionOnOnePartition() {
+        AtomicInteger numResponse = new AtomicInteger(0);
+        Consumer<Collection<ListOffsetsResponseData.ListOffsetsTopicResponse>> 
responseCallback = response ->
+            response.forEach(topic ->
+                topic.partitions().forEach(partition -> {
+                    if (topic.name().equals("test1") && 
partition.partitionIndex() == 0) {
+                        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code(), 
partition.errorCode());
+                        assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 
partition.timestamp());
+                        assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, 
partition.offset());
+                        assertEquals(-1, partition.leaderEpoch());
+                    } else {
+                        assertEquals(Errors.NONE.code(), 
partition.errorCode());
+                        assertEquals(100L, partition.timestamp());
+                        assertEquals(100L, partition.offset());
+                        assertEquals(50, partition.leaderEpoch());
+                    }
+                    numResponse.incrementAndGet();
+                })
+            );
+
+        FileRecords.TimestampAndOffset timestampAndOffset = new 
FileRecords.TimestampAndOffset(100L, 100L, Optional.of(50));
+        CompletableFuture<OffsetResultHolder.FileRecordsOrError> taskFuture = 
new CompletableFuture<>();
+        taskFuture.complete(new 
OffsetResultHolder.FileRecordsOrError(Optional.empty(), 
Optional.of(timestampAndOffset)));
+
+        AtomicInteger cancelledCount = new AtomicInteger(0);
+        CompletableFuture<Void> jobFuture = mock(CompletableFuture.class);
+        AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> 
holder = mock(AsyncOffsetReadFutureHolder.class);
+        when(holder.taskFuture()).thenAnswer(f -> taskFuture);
+        when(holder.jobFuture()).thenReturn(jobFuture);
+        when(jobFuture.cancel(anyBoolean())).thenAnswer(f -> {
+            cancelledCount.incrementAndGet();
+            return true;
+        });
+
+        doThrow(new NotLeaderOrFollowerException("Not leader or follower!"))
+                .when(partitionOrException).accept(new TopicPartition("test1", 
0));
+        AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> 
errorFutureHolder = mock(AsyncOffsetReadFutureHolder.class);
+        CompletableFuture<OffsetResultHolder.FileRecordsOrError> 
errorTaskFuture = new CompletableFuture<>();
+        when(errorFutureHolder.taskFuture()).thenAnswer(f -> errorTaskFuture);
+        when(errorFutureHolder.jobFuture()).thenReturn(jobFuture);
+
+        Map<TopicPartition, ListOffsetsPartitionStatus> statusByPartition = 
Map.of(
+            new TopicPartition("test", 0), 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+            new TopicPartition("test", 1), 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
+            new TopicPartition("test1", 0), 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build(),
+            new TopicPartition("test1", 1), 
ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build()
+        );
+
+        DelayedRemoteListOffsets delayedRemoteListOffsets = new 
DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException, 
responseCallback);
+        List<TopicPartitionOperationKey> listOffsetsRequestKeys = 
statusByPartition.keySet().stream().map(TopicPartitionOperationKey::new).toList();
+        purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, 
listOffsetsRequestKeys);
+
+        assertEquals(1, cancelledCount.get());
+        assertEquals(numResponse.get(), listOffsetsRequestKeys.size());
+    }
+}

Reply via email to