This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a8f49999ccf KAFKA-19019: Add support for remote storage fetch for 
share groups (#19437)
a8f49999ccf is described below

commit a8f49999ccf6a33b986bf895884f1a5e9708fb47
Author: Abhinav Dixit <adi...@confluent.io>
AuthorDate: Tue Apr 22 04:00:24 2025 +0530

    KAFKA-19019: Add support for remote storage fetch for share groups (#19437)
    
    This PR adds the support for remote storage fetch for share groups.
    
    There is a limitation in remote storage fetch for consumer groups that
    we can only perform remote fetch for a single topic partition in a fetch
    request. Since, the logic of share fetch requests is largely based on
    how consumer groups work, we are following similar logic in implementing
    remote storage fetch. However, this problem should be addressed as
    part of KAFKA-19133 which should help us perform fetch for multiple
    remote fetch topic partition in a single share fetch request.
    
    Reviewers: Jun Rao <jun...@gmail.com>
---
 .../java/kafka/server/share/DelayedShareFetch.java | 466 ++++++++++++++---
 .../kafka/server/share/DelayedShareFetchTest.java  | 573 ++++++++++++++++++++-
 2 files changed, 977 insertions(+), 62 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index d68ed06d307..5bd5f4ea6aa 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -22,6 +22,10 @@ import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 
 import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.Time;
@@ -34,8 +38,12 @@ import org.apache.kafka.server.share.fetch.ShareFetch;
 import org.apache.kafka.server.share.fetch.ShareFetchPartitionData;
 import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
 import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.storage.log.FetchPartitionData;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
 import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
 
 import com.yammer.metrics.core.Meter;
 
@@ -44,10 +52,16 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.function.BiConsumer;
@@ -83,7 +97,9 @@ public class DelayedShareFetch extends DelayedOperation {
     // Tracks the start time to acquire any share partition for a fetch 
request.
     private long acquireStartTimeMs;
     private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired;
-    private LinkedHashMap<TopicIdPartition, LogReadResult> 
partitionsAlreadyFetched;
+    private LinkedHashMap<TopicIdPartition, LogReadResult> 
localPartitionsAlreadyFetched;
+    private Optional<RemoteFetch> remoteFetchOpt;
+    private Optional<Exception> remoteStorageFetchException;
 
     /**
      * This function constructs an instance of delayed share fetch operation 
for completing share fetch
@@ -110,10 +126,24 @@ public class DelayedShareFetch extends DelayedOperation {
             sharePartitions,
             
PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM),
             shareGroupMetrics,
-            time
+            time,
+            Optional.empty()
         );
     }
 
+    /**
+     * This function constructs an instance of delayed share fetch operation 
for completing share fetch
+     * requests instantaneously or with delay. The direct usage of this 
constructor is only from tests.
+     *
+     * @param shareFetch The share fetch parameters of the share fetch request.
+     * @param replicaManager The replica manager instance used to read from 
log/complete the request.
+     * @param exceptionHandler The handler to complete share fetch requests 
with exception.
+     * @param sharePartitions The share partitions referenced in the share 
fetch request.
+     * @param partitionMaxBytesStrategy The strategy to identify the max bytes 
for topic partitions in the share fetch request.
+     * @param shareGroupMetrics The share group metrics to record the metrics.
+     * @param time The system time.
+     * @param remoteFetchOpt Optional containing an in-flight remote fetch 
object or an empty optional.
+     */
     DelayedShareFetch(
         ShareFetch shareFetch,
         ReplicaManager replicaManager,
@@ -121,19 +151,22 @@ public class DelayedShareFetch extends DelayedOperation {
         LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
         PartitionMaxBytesStrategy partitionMaxBytesStrategy,
         ShareGroupMetrics shareGroupMetrics,
-        Time time
+        Time time,
+        Optional<RemoteFetch> remoteFetchOpt
     ) {
         super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
         this.shareFetch = shareFetch;
         this.replicaManager = replicaManager;
         this.partitionsAcquired = new LinkedHashMap<>();
-        this.partitionsAlreadyFetched = new LinkedHashMap<>();
+        this.localPartitionsAlreadyFetched = new LinkedHashMap<>();
         this.exceptionHandler = exceptionHandler;
         this.sharePartitions = sharePartitions;
         this.partitionMaxBytesStrategy = partitionMaxBytesStrategy;
         this.shareGroupMetrics = shareGroupMetrics;
         this.time = time;
         this.acquireStartTimeMs = time.hiResClockMs();
+        this.remoteFetchOpt = remoteFetchOpt;
+        this.remoteStorageFetchException = Optional.empty();
         // Register metrics for DelayedShareFetch.
         KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", 
"DelayedShareFetchMetrics");
         this.expiredRequestMeter = metricsGroup.newMeter(EXPIRES_PER_SEC, 
"requests", TimeUnit.SECONDS);
@@ -152,58 +185,68 @@ public class DelayedShareFetch extends DelayedOperation {
     @Override
     public void onComplete() {
         // We are utilizing lock so that onComplete doesn't do a dirty read 
for instance variables -
-        // partitionsAcquired and partitionsAlreadyFetched, since these 
variables can get updated in a different tryComplete thread.
+        // partitionsAcquired and localPartitionsAlreadyFetched, since these 
variables can get updated in a different tryComplete thread.
         lock.lock();
         log.trace("Completing the delayed share fetch request for group {}, 
member {}, "
             + "topic partitions {}", shareFetch.groupId(), 
shareFetch.memberId(),
             partitionsAcquired.keySet());
 
         try {
-            LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
-            // tryComplete did not invoke forceComplete, so we need to check 
if we have any partitions to fetch.
-            if (partitionsAcquired.isEmpty()) {
-                topicPartitionData = acquirablePartitions();
-                // The TopicPartitionsAcquireTimeMs metric signifies the 
tension when acquiring the locks
-                // for the share partition, hence if no partitions are yet 
acquired by tryComplete,
-                // we record the metric here. Do not check if the request has 
successfully acquired any
-                // partitions now or not, as then the upper bound of request 
timeout shall be recorded
-                // for the metric.
-                updateAcquireElapsedTimeMetric();
-            } else {
-                // tryComplete invoked forceComplete, so we can use the data 
from tryComplete.
-                topicPartitionData = partitionsAcquired;
-            }
-
-            if (topicPartitionData.isEmpty()) {
-                // No locks for share partitions could be acquired, so we 
complete the request with an empty response.
-                
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0);
-                shareFetch.maybeComplete(Map.of());
-                return;
+            if (remoteStorageFetchException.isPresent()) {
+                completeErroneousRemoteShareFetchRequest();
+            } else if (remoteFetchOpt.isPresent()) {
+                completeRemoteStorageShareFetchRequest();
             } else {
-                // Update metric to record acquired to requested partitions.
-                double requestTopicToAcquired = (double) 
topicPartitionData.size() / shareFetch.topicIdPartitions().size();
-                
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) 
(requestTopicToAcquired * 100));
+                completeLocalLogShareFetchRequest();
             }
-            log.trace("Fetchable share partitions data: {} with groupId: {} 
fetch params: {}",
-                topicPartitionData, shareFetch.groupId(), 
shareFetch.fetchParams());
-
-            completeShareFetchRequest(topicPartitionData);
         } finally {
             lock.unlock();
         }
     }
 
-    private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, 
Long> topicPartitionData) {
+    private void completeLocalLogShareFetchRequest() {
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
+        // tryComplete did not invoke forceComplete, so we need to check if we 
have any partitions to fetch.
+        if (partitionsAcquired.isEmpty()) {
+            topicPartitionData = acquirablePartitions(sharePartitions);
+            // The TopicPartitionsAcquireTimeMs metric signifies the tension 
when acquiring the locks
+            // for the share partition, hence if no partitions are yet 
acquired by tryComplete,
+            // we record the metric here. Do not check if the request has 
successfully acquired any
+            // partitions now or not, as then the upper bound of request 
timeout shall be recorded
+            // for the metric.
+            updateAcquireElapsedTimeMetric();
+        } else {
+            // tryComplete invoked forceComplete, so we can use the data from 
tryComplete.
+            topicPartitionData = partitionsAcquired;
+        }
+
+        if (topicPartitionData.isEmpty()) {
+            // No locks for share partitions could be acquired, so we complete 
the request with an empty response.
+            
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0);
+            shareFetch.maybeComplete(Map.of());
+            return;
+        } else {
+            // Update metric to record acquired to requested partitions.
+            double requestTopicToAcquired = (double) topicPartitionData.size() 
/ shareFetch.topicIdPartitions().size();
+            
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) 
(requestTopicToAcquired * 100));
+        }
+        log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}",
+            topicPartitionData, shareFetch.groupId(), 
shareFetch.fetchParams());
+
+        processAcquiredTopicPartitionsForLocalLogFetch(topicPartitionData);
+    }
+
+    private void 
processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicIdPartition, 
Long> topicPartitionData) {
         try {
             LinkedHashMap<TopicIdPartition, LogReadResult> responseData;
-            if (partitionsAlreadyFetched.isEmpty())
+            if (localPartitionsAlreadyFetched.isEmpty())
                 responseData = readFromLog(
                     topicPartitionData,
                     
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, 
topicPartitionData.keySet(), topicPartitionData.size()));
             else
                 // There shouldn't be a case when we have a 
partitionsAlreadyFetched value here and this variable is getting
                 // updated in a different tryComplete thread.
-                responseData = combineLogReadResponse(topicPartitionData, 
partitionsAlreadyFetched);
+                responseData = combineLogReadResponse(topicPartitionData, 
localPartitionsAlreadyFetched);
 
             List<ShareFetchPartitionData> shareFetchPartitionDataList = new 
ArrayList<>();
             responseData.forEach((topicIdPartition, logReadResult) ->
@@ -225,15 +268,7 @@ public class DelayedShareFetch extends DelayedOperation {
             log.error("Error processing delayed share fetch request", e);
             handleFetchException(shareFetch, topicPartitionData.keySet(), e);
         } finally {
-            // Releasing the lock to move ahead with the next request in queue.
-            releasePartitionLocks(topicPartitionData.keySet());
-            // If we have a fetch request completed for a topic-partition, we 
release the locks for that partition,
-            // then we should check if there is a pending share fetch request 
for the topic-partition and complete it.
-            // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
-            // we directly call delayedShareFetchPurgatory.checkAndComplete
-            replicaManager.addToActionQueue(() -> 
topicPartitionData.keySet().forEach(topicIdPartition ->
-                replicaManager.completeDelayedShareFetchRequest(
-                    new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+            
releasePartitionLocksAndAddToActionQueue(topicPartitionData.keySet());
         }
     }
 
@@ -242,8 +277,12 @@ public class DelayedShareFetch extends DelayedOperation {
      */
     @Override
     public boolean tryComplete() {
-        LinkedHashMap<TopicIdPartition, Long> topicPartitionData = 
acquirablePartitions();
+        // Check to see if the remote fetch is in flight. If there is an in 
flight remote fetch we want to resolve it first.
+        if (remoteFetchOpt.isPresent()) {
+            return maybeCompletePendingRemoteFetch();
+        }
 
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData = 
acquirablePartitions(sharePartitions);
         try {
             if (!topicPartitionData.isEmpty()) {
                 // Update the metric to record the time taken to acquire the 
locks for the share partitions.
@@ -252,17 +291,17 @@ public class DelayedShareFetch extends DelayedOperation {
                 // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
                 // those topic partitions.
                 LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
+                // Store the remote fetch info and the topic partition for 
which we need to perform remote fetch.
+                Optional<TopicPartitionRemoteFetchInfo> 
topicPartitionRemoteFetchInfoOpt = 
maybePrepareRemoteStorageFetchInfo(topicPartitionData, 
replicaManagerReadResponse);
+
+                if (topicPartitionRemoteFetchInfoOpt.isPresent()) {
+                    return maybeProcessRemoteFetch(topicPartitionData, 
topicPartitionRemoteFetchInfoOpt.get());
+                }
                 maybeUpdateFetchOffsetMetadata(topicPartitionData, 
replicaManagerReadResponse);
                 if (anyPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData, 
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, 
topicPartitionData.keySet(), topicPartitionData.size()))) {
                     partitionsAcquired = topicPartitionData;
-                    partitionsAlreadyFetched = replicaManagerReadResponse;
-                    boolean completedByMe = forceComplete();
-                    // If invocation of forceComplete is not successful, then 
that means the request is already completed
-                    // hence release the acquired locks.
-                    if (!completedByMe) {
-                        releasePartitionLocks(partitionsAcquired.keySet());
-                    }
-                    return completedByMe;
+                    localPartitionsAlreadyFetched = replicaManagerReadResponse;
+                    return forceCompleteRequest();
                 } else {
                     log.debug("minBytes is not satisfied for the share fetch 
request for group {}, member {}, " +
                             "topic partitions {}", shareFetch.groupId(), 
shareFetch.memberId(),
@@ -277,10 +316,18 @@ public class DelayedShareFetch extends DelayedOperation {
             return false;
         } catch (Exception e) {
             log.error("Error processing delayed share fetch request", e);
-            releasePartitionLocks(topicPartitionData.keySet());
-            partitionsAcquired.clear();
-            partitionsAlreadyFetched.clear();
-            return forceComplete();
+            // In case we have a remote fetch exception, we have already 
released locks for partitions which have potential
+            // local log read. We do not release locks for partitions which 
have a remote storage read because we need to
+            // complete the share fetch request in onComplete and if we 
release the locks early here, some other DelayedShareFetch
+            // request might get the locks for those partitions without this 
one getting complete.
+            if (remoteStorageFetchException.isEmpty()) {
+                releasePartitionLocks(topicPartitionData.keySet());
+                partitionsAcquired.clear();
+                localPartitionsAlreadyFetched.clear();
+                return forceCompleteRequest();
+            } else {
+                return forceCompleteRequest();
+            }
         }
     }
 
@@ -288,11 +335,13 @@ public class DelayedShareFetch extends DelayedOperation {
      * Prepare fetch request structure for partitions in the share fetch 
request for which we can acquire records.
      */
     // Visible for testing
-    LinkedHashMap<TopicIdPartition, Long> acquirablePartitions() {
+    LinkedHashMap<TopicIdPartition, Long> acquirablePartitions(
+        LinkedHashMap<TopicIdPartition, SharePartition> 
sharePartitionsForAcquire
+    ) {
         // Initialize the topic partitions for which the fetch should be 
attempted.
         LinkedHashMap<TopicIdPartition, Long> topicPartitionData = new 
LinkedHashMap<>();
 
-        sharePartitions.forEach((topicIdPartition, sharePartition) -> {
+        sharePartitionsForAcquire.forEach((topicIdPartition, sharePartition) 
-> {
             // Add the share partition to the list of partitions to be fetched 
only if we can
             // acquire the fetch lock on it.
             if (sharePartition.maybeAcquireFetchLock()) {
@@ -529,8 +578,307 @@ public class DelayedShareFetch extends DelayedOperation {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> 
topicPartitionRemoteFetchInfoOpt = Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                topicPartitionRemoteFetchInfoOpt = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return topicPartitionRemoteFetchInfoOpt;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
+    ) {
+        Set<TopicIdPartition> nonRemoteFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                nonRemoteFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release fetch lock for the topic partitions that were acquired but 
were not a part of remote fetch and add
+        // them to the delayed actions queue.
+        
releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions);
+        processRemoteFetchOrException(topicPartitionRemoteFetchInfo);
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Throws an exception if a task for remote storage fetch could not be 
scheduled successfully else updates remoteFetchOpt.
+     * @param topicPartitionRemoteFetchInfo - The remote storage fetch 
information.
+     */
+    private void processRemoteFetchOrException(
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
+    ) {
+        TopicIdPartition remoteFetchTopicIdPartition = 
topicPartitionRemoteFetchInfo.topicIdPartition();
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
topicPartitionRemoteFetchInfo.logReadResult().info().delayedRemoteStorageFetch.get();
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (Exception e) {
+            // Throw the error if any in scheduling the remote fetch task.
+            remoteStorageFetchException = Optional.of(e);
+            throw e;
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, 
topicPartitionRemoteFetchInfo.logReadResult(), remoteFetchTask, 
remoteFetchResult, remoteStorageFetchInfo));
+    }
+
+    /**
+     * This function checks if the remote fetch can be completed or not. It 
should always be called once you confirm remoteFetchOpt.isPresent().
+     * The operation can be completed if:
+     * Case a: The partition is in an offline log directory on this broker
+     * Case b: This broker does not know the partition it tries to fetch
+     * Case c: This broker is no longer the leader of the partition it tries 
to fetch
+     * Case d: The remote storage read request completed (succeeded or failed)
+     * @return boolean representing whether the remote fetch is completed or 
not.
+     */
+    private boolean maybeCompletePendingRemoteFetch() {
+        boolean canComplete = false;
+
+        TopicIdPartition topicIdPartition = 
remoteFetchOpt.get().topicIdPartition();
+        try {
+            
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+        } catch (KafkaStorageException e) { // Case a
+            log.debug("TopicPartition {} is in an offline log directory, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+            canComplete = true;
+        } catch (UnknownTopicOrPartitionException e) { // Case b
+            log.debug("Broker no longer knows of topicPartition {}, satisfy {} 
immediately", topicIdPartition, shareFetch.fetchParams());
+            canComplete = true;
+        } catch (NotLeaderOrFollowerException e) { // Case c
+            log.debug("Broker is no longer the leader or follower of 
topicPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());
+            canComplete = true;
+        }
+
+        if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) 
{ // Case d
+            return forceCompleteRequest();
+        } else
+            return false;
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified erroneous remote storage fetch in tryComplete()
+     * It should only be called when we know that there is remote fetch 
in-flight/completed.
+     */
+    private void completeErroneousRemoteShareFetchRequest() {
+        try {
+            handleFetchException(shareFetch, partitionsAcquired.keySet(), 
remoteStorageFetchException.get());
+        } finally {
+            
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
+        }
+
+    }
+
+    private void 
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> 
topicIdPartitions) {
+        if (topicIdPartitions.isEmpty()) {
+            return;
+        }
+        // Releasing the lock to move ahead with the next request in queue.
+        releasePartitionLocks(topicIdPartitions);
+        // If we have a fetch request completed for a topic-partition, we 
release the locks for that partition,
+        // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
+        // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
+        // we directly call delayedShareFetchPurgatory.checkAndComplete
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+            replicaManager.completeDelayedShareFetchRequest(
+                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified remoteFetch during tryComplete()
+     * Note - This function should only be called when we know that there is 
remote fetch.
+     */
+    private void completeRemoteStorageShareFetchRequest() {
+        LinkedHashMap<TopicIdPartition, Long> 
acquiredNonRemoteFetchTopicPartitionData = new LinkedHashMap<>();
+        try {
+            List<ShareFetchPartitionData> shareFetchPartitionData = new 
ArrayList<>();
+            int readableBytes = 0;
+            if (remoteFetchOpt.get().remoteFetchResult().isDone()) {
+                RemoteFetch remoteFetch = remoteFetchOpt.get();
+                RemoteLogReadResult remoteLogReadResult = 
remoteFetch.remoteFetchResult().get();
+                if (remoteLogReadResult.error.isPresent()) {
+                    Throwable error = remoteLogReadResult.error.get();
+                    // If there is any error for the remote fetch topic 
partition, we populate the error accordingly.
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            remoteFetch.topicIdPartition(),
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
+                        )
+                    );
+                } else {
+                    FetchDataInfo info = 
remoteLogReadResult.fetchDataInfo.get();
+                    TopicIdPartition topicIdPartition = 
remoteFetch.topicIdPartition();
+                    LogReadResult logReadResult = remoteFetch.logReadResult();
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            topicIdPartition,
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            new FetchPartitionData(
+                                logReadResult.error(),
+                                logReadResult.highWatermark(),
+                                logReadResult.leaderLogStartOffset(),
+                                info.records,
+                                Optional.empty(),
+                                logReadResult.lastStableOffset().isDefined() ? 
OptionalLong.of((Long) logReadResult.lastStableOffset().get()) : 
OptionalLong.empty(),
+                                info.abortedTransactions,
+                                
logReadResult.preferredReadReplica().isDefined() ? OptionalInt.of((Integer) 
logReadResult.preferredReadReplica().get()) : OptionalInt.empty(),
+                                false
+                            )
+                        )
+                    );
+                    readableBytes += info.records.sizeInBytes();
+                }
+            } else {
+                cancelRemoteFetchTask();
+            }
+
+            // If remote fetch bytes  < shareFetch.fetchParams().maxBytes, 
then we will try for a local read.
+            if (readableBytes < shareFetch.fetchParams().maxBytes) {
+                // Get the local log read based topic partitions.
+                LinkedHashMap<TopicIdPartition, SharePartition> 
nonRemoteFetchSharePartitions = new LinkedHashMap<>();
+                sharePartitions.forEach((topicIdPartition, sharePartition) -> {
+                    if (!partitionsAcquired.containsKey(topicIdPartition)) {
+                        nonRemoteFetchSharePartitions.put(topicIdPartition, 
sharePartition);
+                    }
+                });
+                acquiredNonRemoteFetchTopicPartitionData = 
acquirablePartitions(nonRemoteFetchSharePartitions);
+                if (!acquiredNonRemoteFetchTopicPartitionData.isEmpty()) {
+                    log.trace("Fetchable local share partitions for a remote 
share fetch request data: {} with groupId: {} fetch params: {}",
+                        acquiredNonRemoteFetchTopicPartitionData, 
shareFetch.groupId(), shareFetch.fetchParams());
+
+                    LinkedHashMap<TopicIdPartition, LogReadResult> 
responseData = readFromLog(
+                        acquiredNonRemoteFetchTopicPartitionData,
+                        
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes - 
readableBytes, acquiredNonRemoteFetchTopicPartitionData.keySet(), 
acquiredNonRemoteFetchTopicPartitionData.size()));
+                    for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
responseData.entrySet()) {
+                        if 
(entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) {
+                            shareFetchPartitionData.add(
+                                new ShareFetchPartitionData(
+                                    entry.getKey(),
+                                    
acquiredNonRemoteFetchTopicPartitionData.get(entry.getKey()),
+                                    
entry.getValue().toFetchPartitionData(false)
+                                )
+                            );
+                        }
+                    }
+                }
+            }
+
+            // Update metric to record acquired to requested partitions.
+            double acquiredRatio = (double) (partitionsAcquired.size() + 
acquiredNonRemoteFetchTopicPartitionData.size()) / 
shareFetch.topicIdPartitions().size();
+            if (acquiredRatio > 0)
+                
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) 
(acquiredRatio * 100));
+
+            Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
remoteFetchResponse = ShareFetchUtils.processFetchResponse(
+                shareFetch, shareFetchPartitionData, sharePartitions, 
replicaManager, exceptionHandler);
+            shareFetch.maybeComplete(remoteFetchResponse);
+            log.trace("Remote share fetch request completed successfully, 
response: {}", remoteFetchResponse);
+        } catch (InterruptedException | ExecutionException e) {
+            log.error("Exception occurred in completing remote fetch {} for 
delayed share fetch request {}", remoteFetchOpt.get(), e);
+            
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(),
 e);
+        } catch (Exception e) {
+            log.error("Unexpected error in processing delayed share fetch 
request", e);
+            
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(),
 e);
+        } finally {
+            Set<TopicIdPartition> topicIdPartitions = new 
LinkedHashSet<>(partitionsAcquired.keySet());
+            
topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitionData.keySet());
+            releasePartitionLocksAndAddToActionQueue(topicIdPartitions);
+        }
+    }
+
+    private void handleExceptionInCompletingRemoteStorageShareFetchRequest(
+        Set<TopicIdPartition> acquiredNonRemoteFetchTopicPartitions,
+        Exception e
+    ) {
+        Set<TopicIdPartition> topicIdPartitions = new 
LinkedHashSet<>(partitionsAcquired.keySet());
+        topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitions);
+        handleFetchException(shareFetch, topicIdPartitions, e);
+    }
+
+    /**
+     * Cancel the remote storage read task, if it has not been executed yet 
and avoid interrupting the task if it is
+     * already running as it may force closing opened/cached resources as 
transaction index.
+     * Note - This function should only be called when we know that there is 
remote fetch.
+     */
+    private void cancelRemoteFetchTask() {
+        boolean cancelled = 
remoteFetchOpt.get().remoteFetchTask().cancel(false);
+        if (!cancelled) {
+            log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could 
not be cancelled and its isDone value is {}",
+                remoteFetchOpt.get().remoteFetchInfo(), 
remoteFetchOpt.get().remoteFetchTask().isDone());
+        }
+    }
+
+    private boolean forceCompleteRequest() {
+        boolean completedByMe = forceComplete();
+        // If the delayed operation is completed by me, the acquired locks are 
already released in onComplete().
+        // Otherwise, we need to release the acquired locks.
+        if (!completedByMe) {
+            
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
+        }
+        return completedByMe;
+    }
+
+    public record RemoteFetch(
+        TopicIdPartition topicIdPartition,
+        LogReadResult logReadResult,
+        Future<Void> remoteFetchTask,
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult,
+        RemoteStorageFetchInfo remoteFetchInfo
+    ) {
+        @Override
+        public String toString() {
+            return "RemoteFetch(" +
+                "topicIdPartition=" + topicIdPartition +
+                ", logReadResult=" + logReadResult +
+                ", remoteFetchTask=" + remoteFetchTask +
+                ", remoteFetchResult=" + remoteFetchResult +
+                ", remoteFetchInfo=" + remoteFetchInfo +
+                ")";
+        }
+    }
+
+    public record TopicPartitionRemoteFetchInfo(
+        TopicIdPartition topicIdPartition,
+        LogReadResult logReadResult
+    ) {
+        @Override
+        public String toString() {
+            return "TopicPartitionRemoteFetchInfo(" +
+                "topicIdPartition=" + topicIdPartition +
+                ", logReadResult=" + logReadResult +
+                ")";
+        }
+    }
 }
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index bb8b51b40e2..43ece70ca0e 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -25,10 +25,15 @@ import kafka.server.ReplicaQuota;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.message.ShareFetchResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
 import org.apache.kafka.server.purgatory.DelayedOperationKey;
 import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
 import org.apache.kafka.server.share.SharePartitionKey;
@@ -46,6 +51,8 @@ import org.apache.kafka.server.util.timer.Timer;
 import org.apache.kafka.storage.internals.log.FetchDataInfo;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
 import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 
 import org.junit.jupiter.api.AfterEach;
@@ -61,10 +68,15 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import scala.Option;
 import scala.Tuple2;
+import scala.collection.Seq;
 import scala.jdk.javaapi.CollectionConverters;
 
 import static 
kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL;
@@ -73,6 +85,7 @@ import static 
kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDel
 import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -95,6 +108,8 @@ public class DelayedShareFetchTest {
     private static final FetchParams FETCH_PARAMS = new FetchParams(
         FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK,
         Optional.empty(), true);
+    private static final FetchDataInfo REMOTE_FETCH_INFO = new 
FetchDataInfo(new LogOffsetMetadata(0, 0, 0),
+        MemoryRecords.EMPTY, false, Optional.empty(), 
Optional.of(mock(RemoteStorageFetchInfo.class)));
     private static final BrokerTopicStats BROKER_TOPIC_STATS = new 
BrokerTopicStats();
 
     private Timer mockTimer;
@@ -487,7 +502,7 @@ public class DelayedShareFetchTest {
         delayedShareFetch.forceComplete();
         assertTrue(delayedShareFetch.isCompleted());
         // Verifying that the first forceComplete calls acquirablePartitions 
method in DelayedShareFetch.
-        Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
+        Mockito.verify(delayedShareFetch, 
times(1)).acquirablePartitions(sharePartitions);
         assertEquals(0, future.join().size());
         assertTrue(delayedShareFetch.lock().tryLock());
         delayedShareFetch.lock().unlock();
@@ -497,7 +512,7 @@ public class DelayedShareFetchTest {
         delayedShareFetch.forceComplete();
         assertTrue(delayedShareFetch.isCompleted());
         // Verifying that the second forceComplete does not call 
acquirablePartitions method in DelayedShareFetch.
-        Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
+        Mockito.verify(delayedShareFetch, 
times(1)).acquirablePartitions(sharePartitions);
         Mockito.verify(delayedShareFetch, 
times(0)).releasePartitionLocks(any());
         assertTrue(delayedShareFetch.lock().tryLock());
         // Assert both metrics shall be recorded only once.
@@ -1155,6 +1170,525 @@ public class DelayedShareFetchTest {
         assertEquals(1, delayedShareFetch.expiredRequestMeter().count());
     }
 
+    @Test
+    public void testRemoteStorageFetchTryCompleteReturnsFalse() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, 
MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(mock(Future.class));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertFalse(delayedShareFetch.tryComplete());
+        assertFalse(delayedShareFetch.isCompleted());
+        // Remote fetch object gets created for delayed share fetch object.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for local log read topic partitions 
tp0 and tp1.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1));
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchTryCompleteThrowsException() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+
+        // Fetch offset does not match with the cached entry for sp0 and sp1. 
Hence, a replica manager fetch will happen for sp0 and sp1.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp0 and remote storage read 
result for tp1.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), 
Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Exception will be thrown during the 
creation of remoteFetch object.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), any())).thenThrow(new 
RejectedExecutionException("Exception thrown"));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withExceptionHandler(exceptionHandler)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        // tryComplete returns true and goes to forceComplete once the 
exception occurs.
+        assertTrue(delayedShareFetch.tryComplete());
+        assertTrue(delayedShareFetch.isCompleted());
+        // The future of shareFetch completes.
+        assertTrue(shareFetch.isCompleted());
+        assertFalse(future.isCompletedExceptionally());
+        assertEquals(Set.of(tp1), future.join().keySet());
+        // Exception occurred and was handled.
+        Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
+        // Verify the locks are released for both local and remote read topic 
partitions tp0 and tp1 because of exception occurrence.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        Mockito.verify(delayedShareFetch, times(1)).onComplete();
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during 
tryComplete.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2 on first replicaManager readFromLog call(from tryComplete).
+        // Mocking local log read result for tp0 and tp1 on second 
replicaManager readFromLog call(from onComplete).
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))
+        ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of())
+        ).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock but the broker becomes unavailable.
+        Future<Void> remoteFetchTask = mock(Future.class);
+        doAnswer(invocation -> {
+            when(remoteFetchTask.isCancelled()).thenReturn(true);
+            return false;
+        }).when(remoteFetchTask).cancel(false);
+
+        when(remoteFetchTask.cancel(false)).thenReturn(true);
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(remoteFetchTask);
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+        
when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        
assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled());
+        // Partition locks should be released for all 3 topic partitions
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2));
+        assertTrue(shareFetch.isCompleted());
+        // Share fetch response contained tp0 and tp1 (local fetch) but not 
tp2, since it errored out.
+        assertEquals(Set.of(tp0, tp1), future.join().keySet());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 is acquirable, sp1 is not acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.empty(),
+            Optional.of(new TimeoutException("Error occurred while creating 
remote fetch result")) // Remote fetch result is returned with an error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for tp0.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0), future.join().keySet());
+        assertEquals(Errors.REQUEST_TIMED_OUT.code(), 
future.join().get(tp0).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+
+        SharePartition sp0 = mock(SharePartition.class);
+
+        // sp0 is acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.of(REMOTE_FETCH_INFO),
+            Optional.empty() // Remote fetch result is returned successfully 
without error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for tp0.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0), future.join().keySet());
+        assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() 
{
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset does not match with the cached entry for sp0, sp1 and 
sp2. Hence, a replica manager fetch will happen for all of them in tryComplete.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp2.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // Mocking local log read result for tp0, tp1 and remote storage read 
result for tp2 on first replicaManager readFromLog call(from tryComplete).
+        // Mocking local log read result for tp0 and tp1 on second 
replicaManager readFromLog call(from onComplete).
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of(tp2))
+        ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of())
+        ).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.of(REMOTE_FETCH_INFO),
+            Optional.empty() // Remote fetch result is returned successfully 
without error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withReplicaManager(replicaManager)
+            .withSharePartitions(sharePartitions)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // the future of shareFetch completes.
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0, tp1, tp2), future.join().keySet());
+        // Verify the locks are released for both local log and remote storage 
read topic partitions tp0, tp1 and tp2.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2));
+        assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode());
+        assertEquals(Errors.NONE.code(), future.join().get(tp1).errorCode());
+        assertEquals(Errors.NONE.code(), future.join().get(tp2).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 and sp1 are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0 and sp1. 
Hence, a replica manager fetch will happen for both.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        LinkedHashSet<TopicIdPartition> remoteStorageFetchPartitions = new 
LinkedHashSet<>();
+        remoteStorageFetchPartitions.add(tp0);
+        remoteStorageFetchPartitions.add(tp1);
+
+        // Mocking remote storage read result for tp0 and tp1.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
remoteStorageFetchPartitions)).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.of(REMOTE_FETCH_INFO),
+            Optional.empty() // Remote fetch result is returned successfully 
without error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released separately for tp1 (from tryComplete).
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        // From onComplete, the locks will be released for both tp0 and tp1. 
tp0 because it was acquired from
+        // tryComplete and has remote fetch processed. tp1 will be reacquired 
in onComplete when we check for local log read.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1));
+        assertTrue(shareFetch.isCompleted());
+        // Share fetch response only contains the first remote storage fetch 
topic partition - tp0.
+        assertEquals(Set.of(tp0), future.join().keySet());
+        assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
     static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager 
replicaManager, TopicIdPartition topicIdPartition, int minBytes) {
         LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 
minBytes);
         LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, 
mock(LogOffsetMetadata.class),
@@ -1182,6 +1716,37 @@ public class DelayedShareFetchTest {
         return partitionMaxBytesStrategy;
     }
 
+    private Seq<Tuple2<TopicIdPartition, LogReadResult>> 
buildLocalAndRemoteFetchResult(
+        Set<TopicIdPartition> localLogReadTopicIdPartitions,
+        Set<TopicIdPartition> remoteReadTopicIdPartitions) {
+        List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new 
ArrayList<>();
+        localLogReadTopicIdPartitions.forEach(topicIdPartition -> 
logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(
+            new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), 
MemoryRecords.EMPTY),
+            Option.empty(),
+            -1L,
+            -1L,
+            -1L,
+            -1L,
+            -1L,
+            Option.empty(),
+            Option.empty(),
+            Option.empty()
+        ))));
+        remoteReadTopicIdPartitions.forEach(topicIdPartition -> 
logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(
+            REMOTE_FETCH_INFO,
+            Option.empty(),
+            -1L,
+            -1L,
+            -1L,
+            -1L,
+            -1L,
+            Option.empty(),
+            Option.empty(),
+            Option.empty()
+        ))));
+        return CollectionConverters.asScala(logReadResults).toSeq();
+    }
+
     @SuppressWarnings("unchecked")
     private static BiConsumer<SharePartitionKey, Throwable> 
mockExceptionHandler() {
         return mock(BiConsumer.class);
@@ -1194,6 +1759,7 @@ public class DelayedShareFetchTest {
         private LinkedHashMap<TopicIdPartition, SharePartition> 
sharePartitions = mock(LinkedHashMap.class);
         private PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mock(PartitionMaxBytesStrategy.class);
         private Time time = new MockTime();
+        private final Optional<DelayedShareFetch.RemoteFetch> remoteFetch = 
Optional.empty();
         private ShareGroupMetrics shareGroupMetrics = 
mock(ShareGroupMetrics.class);
 
         DelayedShareFetchBuilder withShareFetchData(ShareFetch shareFetch) {
@@ -1243,7 +1809,8 @@ public class DelayedShareFetchTest {
                 sharePartitions,
                 partitionMaxBytesStrategy,
                 shareGroupMetrics,
-                time);
+                time,
+                remoteFetch);
         }
     }
 }

Reply via email to