This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1834030107b KAFKA-17510: Exception handling and purgatory completion 
on initialization delay (#17709)
1834030107b is described below

commit 1834030107b93fe2c08fcb48451f14b0d764051d
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Nov 14 15:44:42 2024 +0000

    KAFKA-17510: Exception handling and purgatory completion on initialization 
delay (#17709)
    
    Reviewers: Jun Rao <[email protected]>
---
 .../java/kafka/server/share/DelayedShareFetch.java |  87 ++++++---
 .../java/kafka/server/share/ShareFetchUtils.java   |  12 +-
 .../java/kafka/server/share/SharePartition.java    |   4 +-
 .../kafka/server/share/SharePartitionManager.java  | 136 +++++---------
 .../kafka/server/share/DelayedShareFetchTest.java  | 107 ++++++-----
 .../kafka/server/share/ShareFetchUtilsTest.java    |  26 +--
 .../server/share/SharePartitionManagerTest.java    | 128 +++++++++++--
 .../server/ShareFetchAcknowledgeRequestTest.scala  |  78 ++++++--
 .../kafka/server/share/fetch/ShareFetch.java       | 202 +++++++++++++++++++++
 .../kafka/server/share/fetch/ShareFetchData.java   |  78 --------
 .../kafka/server/share/fetch/ShareFetchTest.java   |  90 +++++++++
 11 files changed, 658 insertions(+), 290 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 4cb9ce0cf42..a5cd79ee358 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -25,8 +25,9 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.server.purgatory.DelayedOperation;
+import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
 import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.server.storage.log.FetchPartitionData;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
@@ -55,7 +56,7 @@ public class DelayedShareFetch extends DelayedOperation {
 
     private static final Logger log = 
LoggerFactory.getLogger(DelayedShareFetch.class);
 
-    private final ShareFetchData shareFetchData;
+    private final ShareFetch shareFetch;
     private final ReplicaManager replicaManager;
 
     private Map<TopicIdPartition, FetchRequest.PartitionData> 
partitionsAcquired;
@@ -66,12 +67,12 @@ public class DelayedShareFetch extends DelayedOperation {
     private final LinkedHashMap<TopicIdPartition, SharePartition> 
sharePartitions;
 
     DelayedShareFetch(
-            ShareFetchData shareFetchData,
+            ShareFetch shareFetch,
             ReplicaManager replicaManager,
             SharePartitionManager sharePartitionManager,
             LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) {
-        super(shareFetchData.fetchParams().maxWaitMs, Optional.empty());
-        this.shareFetchData = shareFetchData;
+        super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
+        this.shareFetch = shareFetch;
         this.replicaManager = replicaManager;
         this.partitionsAcquired = new LinkedHashMap<>();
         this.partitionsAlreadyFetched = new LinkedHashMap<>();
@@ -91,10 +92,10 @@ public class DelayedShareFetch extends DelayedOperation {
     @Override
     public void onComplete() {
         log.trace("Completing the delayed share fetch request for group {}, 
member {}, "
-            + "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+            + "topic partitions {}", shareFetch.groupId(), 
shareFetch.memberId(),
             partitionsAcquired.keySet());
 
-        if (shareFetchData.future().isDone())
+        if (shareFetch.isCompleted())
             return;
 
         Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
@@ -107,11 +108,11 @@ public class DelayedShareFetch extends DelayedOperation {
 
         if (topicPartitionData.isEmpty()) {
             // No locks for share partitions could be acquired, so we complete 
the request with an empty response.
-            shareFetchData.future().complete(Collections.emptyMap());
+            shareFetch.maybeComplete(Collections.emptyMap());
             return;
         }
         log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}",
-            topicPartitionData, shareFetchData.groupId(), 
shareFetchData.fetchParams());
+            topicPartitionData, shareFetch.groupId(), 
shareFetch.fetchParams());
 
         try {
             Map<TopicIdPartition, LogReadResult> responseData;
@@ -126,11 +127,11 @@ public class DelayedShareFetch extends DelayedOperation {
             for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
responseData.entrySet())
                 fetchPartitionsData.put(entry.getKey(), 
entry.getValue().toFetchPartitionData(false));
 
-            
shareFetchData.future().complete(ShareFetchUtils.processFetchResponse(shareFetchData,
 fetchPartitionsData,
+            
shareFetch.maybeComplete(ShareFetchUtils.processFetchResponse(shareFetch, 
fetchPartitionsData,
                 sharePartitions, replicaManager));
         } catch (Exception e) {
             log.error("Error processing delayed share fetch request", e);
-            
sharePartitionManager.handleFetchException(shareFetchData.groupId(), 
topicPartitionData.keySet(), shareFetchData.future(), e);
+            handleFetchException(shareFetch, topicPartitionData.keySet(), e);
         } finally {
             // Releasing the lock to move ahead with the next request in queue.
             releasePartitionLocks(topicPartitionData.keySet());
@@ -140,7 +141,7 @@ public class DelayedShareFetch extends DelayedOperation {
             // we directly call delayedShareFetchPurgatory.checkAndComplete
             replicaManager.addToActionQueue(() -> 
topicPartitionData.keySet().forEach(topicIdPartition ->
                 replicaManager.completeDelayedShareFetchRequest(
-                    new DelayedShareFetchGroupKey(shareFetchData.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+                    new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
         }
     }
 
@@ -170,13 +171,13 @@ public class DelayedShareFetch extends DelayedOperation {
                     return completedByMe;
                 } else {
                     log.debug("minBytes is not satisfied for the share fetch 
request for group {}, member {}, " +
-                            "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+                            "topic partitions {}", shareFetch.groupId(), 
shareFetch.memberId(),
                         sharePartitions.keySet());
                     releasePartitionLocks(topicPartitionData.keySet());
                 }
             } else {
                 log.trace("Can't acquire records for any partition in the 
share fetch request for group {}, member {}, " +
-                        "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+                        "topic partitions {}", shareFetch.groupId(), 
shareFetch.memberId(),
                     sharePartitions.keySet());
             }
             return false;
@@ -198,7 +199,7 @@ public class DelayedShareFetch extends DelayedOperation {
         Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
new LinkedHashMap<>();
 
         sharePartitions.forEach((topicIdPartition, sharePartition) -> {
-            int partitionMaxBytes = 
shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
+            int partitionMaxBytes = 
shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
             // Add the share partition to the list of partitions to be fetched 
only if we can
             // acquire the fetch lock on it.
             if (sharePartition.maybeAcquireFetchLock()) {
@@ -266,7 +267,16 @@ public class DelayedShareFetch extends DelayedOperation {
         for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : 
topicPartitionData.entrySet()) {
             TopicIdPartition topicIdPartition = entry.getKey();
             FetchRequest.PartitionData partitionData = entry.getValue();
-            LogOffsetMetadata endOffsetMetadata = 
endOffsetMetadataForTopicPartition(topicIdPartition);
+
+            LogOffsetMetadata endOffsetMetadata;
+            try {
+                endOffsetMetadata = 
endOffsetMetadataForTopicPartition(topicIdPartition);
+            } catch (Exception e) {
+                shareFetch.addErroneous(topicIdPartition, e);
+                sharePartitionManager.handleFencedSharePartitionException(
+                    new SharePartitionKey(shareFetch.groupId(), 
topicIdPartition), e);
+                continue;
+            }
 
             if (endOffsetMetadata == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
                 continue;
@@ -280,14 +290,14 @@ public class DelayedShareFetch extends DelayedOperation {
 
             if (fetchOffsetMetadata.messageOffset > 
endOffsetMetadata.messageOffset) {
                 log.debug("Satisfying delayed share fetch request for group 
{}, member {} since it is fetching later segments of " +
-                    "topicIdPartition {}", shareFetchData.groupId(), 
shareFetchData.memberId(), topicIdPartition);
+                    "topicIdPartition {}", shareFetch.groupId(), 
shareFetch.memberId(), topicIdPartition);
                 return true;
             } else if (fetchOffsetMetadata.messageOffset < 
endOffsetMetadata.messageOffset) {
                 if (fetchOffsetMetadata.onOlderSegment(endOffsetMetadata)) {
                     // This can happen when the fetch operation is falling 
behind the current segment or the partition
                     // has just rolled a new segment.
                     log.debug("Satisfying delayed share fetch request for 
group {}, member {} immediately since it is fetching older " +
-                        "segments of topicIdPartition {}", 
shareFetchData.groupId(), shareFetchData.memberId(), topicIdPartition);
+                        "segments of topicIdPartition {}", 
shareFetch.groupId(), shareFetch.memberId(), topicIdPartition);
                     return true;
                 } else if 
(fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
                     // we take the partition fetch size as upper bound when 
accumulating the bytes.
@@ -296,15 +306,15 @@ public class DelayedShareFetch extends DelayedOperation {
                 }
             }
         }
-        return accumulatedSize >= shareFetchData.fetchParams().minBytes;
+        return accumulatedSize >= shareFetch.fetchParams().minBytes;
     }
 
     private LogOffsetMetadata 
endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
-        Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+        Partition partition = ShareFetchUtils.partition(replicaManager, 
topicIdPartition.topicPartition());
         LogOffsetSnapshot offsetSnapshot = 
partition.fetchOffsetSnapshot(Optional.empty(), true);
         // The FetchIsolation type that we use for share fetch is 
FetchIsolation.HIGH_WATERMARK. In the future, we can
         // extend it to support other FetchIsolation types.
-        FetchIsolation isolationType = shareFetchData.fetchParams().isolation;
+        FetchIsolation isolationType = shareFetch.fetchParams().isolation;
         if (isolationType == FetchIsolation.LOG_END)
             return offsetSnapshot.logEndOffset;
         else if (isolationType == FetchIsolation.HIGH_WATERMARK)
@@ -315,11 +325,17 @@ public class DelayedShareFetch extends DelayedOperation {
     }
 
     private Map<TopicIdPartition, LogReadResult> 
readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        // Filter if there already exists any erroneous topic partition.
+        Set<TopicIdPartition> partitionsToFetch = 
shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet());
+        if (partitionsToFetch.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
         Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
-            shareFetchData.fetchParams(),
+            shareFetch.fetchParams(),
             CollectionConverters.asScala(
-                topicPartitionData.entrySet().stream().map(entry ->
-                    new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
+                partitionsToFetch.stream().map(topicIdPartition ->
+                    new Tuple2<>(topicIdPartition, 
topicPartitionData.get(topicIdPartition))).collect(Collectors.toList())
             ),
             QuotaFactory.UNBOUNDED_QUOTA,
             true);
@@ -339,6 +355,29 @@ public class DelayedShareFetch extends DelayedOperation {
             .anyMatch(logReadResult -> logReadResult.error().code() != 
Errors.NONE.code());
     }
 
+    /**
+     * The handleFetchException method is used to handle the exception that 
occurred while reading from log.
+     * The method will handle the exception for each topic-partition in the 
request. The share partition
+     * might get removed from the cache.
+     * <p>
+     * The replica read request might error out for one share partition
+     * but as we cannot determine which share partition errored out, we might 
remove all the share partitions
+     * in the request.
+     *
+     * @param shareFetch The share fetch request.
+     * @param topicIdPartitions The topic-partitions in the replica read 
request.
+     * @param throwable The exception that occurred while fetching messages.
+     */
+    private void handleFetchException(
+        ShareFetch shareFetch,
+        Set<TopicIdPartition> topicIdPartitions,
+        Throwable throwable
+    ) {
+        topicIdPartitions.forEach(topicIdPartition -> 
sharePartitionManager.handleFencedSharePartitionException(
+            new SharePartitionKey(shareFetch.groupId(), topicIdPartition), 
throwable));
+        shareFetch.maybeCompleteWithException(topicIdPartitions, throwable);
+    }
+
     // Visible for testing.
     Map<TopicIdPartition, LogReadResult> 
combineLogReadResponse(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData,
                                                                 
Map<TopicIdPartition, LogReadResult> existingFetchedData) {
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java 
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index 3515362152b..e3608128eb5 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
 import org.apache.kafka.server.storage.log.FetchPartitionData;
 
 import org.slf4j.Logger;
@@ -55,7 +55,7 @@ public class ShareFetchUtils {
      * by acquiring records from the share partition.
      */
     static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
processFetchResponse(
-            ShareFetchData shareFetchData,
+            ShareFetch shareFetch,
             Map<TopicIdPartition, FetchPartitionData> responseData,
             LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
             ReplicaManager replicaManager
@@ -91,7 +91,7 @@ public class ShareFetchUtils {
                     partitionData.setErrorMessage(Errors.NONE.message());
                 }
             } else {
-                ShareAcquiredRecords shareAcquiredRecords = 
sharePartition.acquire(shareFetchData.memberId(), 
shareFetchData.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
+                ShareAcquiredRecords shareAcquiredRecords = 
sharePartition.acquire(shareFetch.memberId(), shareFetch.maxFetchRecords() - 
acquiredRecordsCount, fetchPartitionData);
                 log.trace("Acquired records: {} for topicIdPartition: {}", 
shareAcquiredRecords, topicIdPartition);
                 // Maybe, in the future, check if no records are acquired, and 
we want to retry
                 // replica manager fetch. Depends on the share partition 
manager implementation,
@@ -151,11 +151,15 @@ public class ShareFetchUtils {
     }
 
     static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
+        return partition(replicaManager, tp).getLeaderEpoch();
+    }
+
+    static Partition partition(ReplicaManager replicaManager, TopicPartition 
tp) {
         Partition partition = replicaManager.getPartitionOrException(tp);
         if (!partition.isLeader()) {
             log.debug("The broker is not the leader for topic partition: 
{}-{}", tp.topic(), tp.partition());
             throw new NotLeaderOrFollowerException();
         }
-        return partition.getLeaderEpoch();
+        return partition;
     }
 }
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 71baea10174..632cb1e3169 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1082,8 +1082,8 @@ public class SharePartition {
 
     /**
      * Prior to fetching records from the leader, the fetch lock is acquired 
to ensure that the same
-     * share partition does not enter a fetch queue while another one is being 
fetched within the queue.
-     * The fetch lock is released once the records are fetched from the leader.
+     * share partition is not fetched concurrently by multiple clients. The 
fetch lock is released once
+     * the records are fetched and acquired.
      *
      * @return A boolean which indicates whether the fetch lock is acquired.
      */
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 4288dd55703..1c6c5492372 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -49,7 +49,7 @@ import 
org.apache.kafka.server.share.context.ShareSessionContext;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
 import org.apache.kafka.server.share.persister.Persister;
 import org.apache.kafka.server.share.session.ShareSession;
 import org.apache.kafka.server.share.session.ShareSessionCache;
@@ -71,10 +71,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
 
 /**
  * The SharePartitionManager is responsible for managing the SharePartitions 
and ShareSessions.
@@ -250,7 +248,7 @@ public class SharePartitionManager implements AutoCloseable 
{
                 partitionMaxBytes.keySet(), groupId, fetchParams);
 
         CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new 
CompletableFuture<>();
-        processShareFetch(new ShareFetchData(fetchParams, groupId, memberId, 
future, partitionMaxBytes, maxFetchRecords));
+        processShareFetch(new ShareFetch(fetchParams, groupId, memberId, 
future, partitionMaxBytes, maxFetchRecords));
 
         return future;
     }
@@ -498,30 +496,6 @@ public class SharePartitionManager implements 
AutoCloseable {
         }
     }
 
-    /**
-     * The handleFetchException method is used to handle the exception that 
occurred while reading from log.
-     * The method will handle the exception for each topic-partition in the 
request. The share partition
-     * might get removed from the cache.
-     * <p>
-     * The replica read request might error out for one share partition
-     * but as we cannot determine which share partition errored out, we might 
remove all the share partitions
-     * in the request.
-     *
-     * @param groupId The group id in the share fetch request.
-     * @param topicIdPartitions The topic-partitions in the replica read 
request.
-     * @param future The future to complete with the exception.
-     * @param throwable The exception that occurred while fetching messages.
-     */
-    public void handleFetchException(
-        String groupId,
-        Set<TopicIdPartition> topicIdPartitions,
-        CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
-        Throwable throwable
-    ) {
-        topicIdPartitions.forEach(topicIdPartition -> 
handleFencedSharePartitionException(sharePartitionKey(groupId, 
topicIdPartition), throwable));
-        maybeCompleteShareFetchWithException(future, topicIdPartitions, 
throwable);
-    }
-
     /**
      * The cachedTopicIdPartitionsInShareSession method is used to get the 
cached topic-partitions in the share session.
      *
@@ -564,20 +538,18 @@ public class SharePartitionManager implements 
AutoCloseable {
     }
 
     // Visible for testing.
-    void processShareFetch(ShareFetchData shareFetchData) {
-        if (shareFetchData.partitionMaxBytes().isEmpty()) {
+    void processShareFetch(ShareFetch shareFetch) {
+        if (shareFetch.partitionMaxBytes().isEmpty()) {
             // If there are no partitions to fetch then complete the future 
with an empty map.
-            shareFetchData.future().complete(Collections.emptyMap());
+            shareFetch.maybeComplete(Collections.emptyMap());
             return;
         }
 
-        // Initialize lazily, if required.
-        Map<TopicIdPartition, Throwable> erroneous = null;
         List<DelayedShareFetchKey> delayedShareFetchWatchKeys = new 
ArrayList<>();
         LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
-        for (TopicIdPartition topicIdPartition : 
shareFetchData.partitionMaxBytes().keySet()) {
+        for (TopicIdPartition topicIdPartition : 
shareFetch.partitionMaxBytes().keySet()) {
             SharePartitionKey sharePartitionKey = sharePartitionKey(
-                shareFetchData.groupId(),
+                shareFetch.groupId(),
                 topicIdPartition
             );
 
@@ -585,15 +557,8 @@ public class SharePartitionManager implements 
AutoCloseable {
             try {
                 sharePartition = getOrCreateSharePartition(sharePartitionKey);
             } catch (Exception e) {
-                // Complete the whole fetch request with an exception if there 
is an error processing.
-                // The exception currently can be thrown only if there is an 
error while initializing
-                // the share partition. But skip the processing for other 
share partitions in the request
-                // as this situation is not expected.
-                log.error("Error processing share fetch request", e);
-                if (erroneous == null) {
-                    erroneous = new HashMap<>();
-                }
-                erroneous.put(topicIdPartition, e);
+                log.debug("Error processing share fetch request", e);
+                shareFetch.addErroneous(topicIdPartition, e);
                 // Continue iteration for other partitions in the request.
                 continue;
             }
@@ -601,37 +566,42 @@ public class SharePartitionManager implements 
AutoCloseable {
             // We add a key corresponding to each share partition in the 
request in the group so that when there are
             // acknowledgements/acquisition lock timeout etc., we have a way 
to perform checkAndComplete for all
             // such requests which are delayed because of lack of data to 
acquire for the share partition.
-            delayedShareFetchWatchKeys.add(new 
DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), 
topicIdPartition.partition()));
+            DelayedShareFetchKey delayedShareFetchKey = new 
DelayedShareFetchGroupKey(shareFetch.groupId(),
+                topicIdPartition.topicId(), topicIdPartition.partition());
+            delayedShareFetchWatchKeys.add(delayedShareFetchKey);
             // We add a key corresponding to each topic partition in the 
request so that when the HWM is updated
             // for any topic partition, we have a way to perform 
checkAndComplete for all such requests which are
             // delayed because of lack of data to acquire for the topic 
partition.
             delayedShareFetchWatchKeys.add(new 
DelayedShareFetchPartitionKey(topicIdPartition.topicId(), 
topicIdPartition.partition()));
-            // The share partition is initialized asynchronously, so we need 
to wait for it to be initialized.
-            // But if the share partition is already initialized, then the 
future will be completed immediately.
-            // Hence, it's safe to call the maybeInitialize method and then 
wait for the future to be completed.
-            // TopicPartitionData list will be populated only if the share 
partition is already initialized.
-            sharePartition.maybeInitialize().whenComplete((result, throwable) 
-> {
+
+            CompletableFuture<Void> initializationFuture = 
sharePartition.maybeInitialize();
+            final boolean initialized = initializationFuture.isDone();
+            initializationFuture.whenComplete((result, throwable) -> {
                 if (throwable != null) {
-                    // TODO: Complete error handling for initialization. We 
have to record the error
-                    //  for respective share partition as completing the full 
request might result in
-                    //  some acquired records to not being sent: 
https://issues.apache.org/jira/browse/KAFKA-17510
-                    
maybeCompleteInitializationWithException(sharePartitionKey, 
shareFetchData.future(), throwable);
+                    handleInitializationException(sharePartitionKey, 
shareFetch, throwable);
                 }
+                // Though the share partition is initialized asynchronously, 
but if already initialized or
+                // errored then future should be completed immediately. If the 
initialization is not completed
+                // immediately then the requests might be waiting in purgatory 
until the share partition
+                // is initialized. Hence, trigger the completion of all 
pending delayed share fetch requests
+                // for the share partition.
+                if (!initialized)
+                    
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
             });
             sharePartitions.put(topicIdPartition, sharePartition);
         }
 
         // If all the partitions in the request errored out, then complete the 
fetch request with an exception.
-        if (erroneous != null && erroneous.size() == 
shareFetchData.partitionMaxBytes().size()) {
-            completeShareFetchWithException(shareFetchData.future(), 
erroneous);
+        if (shareFetch.errorInAllPartitions()) {
+            shareFetch.maybeComplete(Collections.emptyMap());
             // Do not proceed with share fetch processing as all the 
partitions errored out.
             return;
         }
 
-        // TODO: If there exists some erroneous partitions then they will not 
be part of response.
-
         // Add the share fetch to the delayed share fetch purgatory to process 
the fetch request.
-        addDelayedShareFetch(new DelayedShareFetch(shareFetchData, 
replicaManager, this, sharePartitions), delayedShareFetchWatchKeys);
+        // The request will be added irrespective of whether the share 
partition is initialized or not.
+        // Once the share partition is initialized, the delayed share fetch 
will be completed.
+        addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager, 
this, sharePartitions), delayedShareFetchWatchKeys);
     }
 
     private SharePartition getOrCreateSharePartition(SharePartitionKey 
sharePartitionKey) {
@@ -657,28 +627,35 @@ public class SharePartitionManager implements 
AutoCloseable {
                 });
     }
 
-    private void maybeCompleteInitializationWithException(
+    private void handleInitializationException(
             SharePartitionKey sharePartitionKey,
-            CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
+            ShareFetch shareFetch,
             Throwable throwable) {
         if (throwable instanceof LeaderNotAvailableException) {
             log.debug("The share partition with key {} is not initialized 
yet", sharePartitionKey);
-            // Do not process the fetch request for this partition as the 
leader is not initialized yet.
-            // The fetch request will be retried in the next poll.
-            // TODO: Add the request to delayed fetch purgatory.
+            // Skip any handling for this error as the share partition is 
still loading. The request
+            // to fetch will be added in purgatory and will be completed once 
either timed out
+            // or the share partition initialization completes.
             return;
         }
 
         // Remove the partition from the cache as it's failed to initialize.
-        partitionCacheMap.remove(sharePartitionKey);
-        // The partition initialization failed, so complete the request with 
the exception.
-        // The server should not be in this state, so log the error on broker 
and surface the same
-        // to the client. The broker should not be in this state, investigate 
the root cause of the error.
-        log.error("Error initializing share partition with key {}", 
sharePartitionKey, throwable);
-        maybeCompleteShareFetchWithException(future, 
Collections.singletonList(sharePartitionKey.topicIdPartition()), throwable);
+        SharePartition sharePartition = 
partitionCacheMap.remove(sharePartitionKey);
+        if (sharePartition != null) {
+            sharePartition.markFenced();
+        }
+        // The partition initialization failed, so add the partition to the 
erroneous partitions.
+        log.debug("Error initializing share partition with key {}", 
sharePartitionKey, throwable);
+        shareFetch.addErroneous(sharePartitionKey.topicIdPartition(), 
throwable);
     }
 
-    private void handleFencedSharePartitionException(
+    /**
+     * The method is used to handle the share partition exception.
+     *
+     * @param sharePartitionKey The share partition key.
+     * @param throwable The exception.
+     */
+    public void handleFencedSharePartitionException(
         SharePartitionKey sharePartitionKey,
         Throwable throwable
     ) {
@@ -695,23 +672,6 @@ public class SharePartitionManager implements 
AutoCloseable {
         }
     }
 
-    private void 
maybeCompleteShareFetchWithException(CompletableFuture<Map<TopicIdPartition, 
PartitionData>> future,
-        Collection<TopicIdPartition> topicIdPartitions, Throwable throwable) {
-        if (!future.isDone()) {
-            
future.complete(topicIdPartitions.stream().collect(Collectors.toMap(
-                tp -> tp, tp -> new 
PartitionData().setErrorCode(Errors.forException(throwable).code()).setErrorMessage(throwable.getMessage()))));
-        }
-    }
-
-    private void 
completeShareFetchWithException(CompletableFuture<Map<TopicIdPartition, 
PartitionData>> future,
-        Map<TopicIdPartition, Throwable> erroneous) {
-        future.complete(erroneous.entrySet().stream().collect(Collectors.toMap(
-            Map.Entry::getKey, entry -> {
-                Throwable t = entry.getValue();
-                return new 
PartitionData().setErrorCode(Errors.forException(t).code()).setErrorMessage(t.getMessage());
-            })));
-    }
-
     private SharePartitionKey sharePartitionKey(String groupId, 
TopicIdPartition topicIdPartition) {
         return new SharePartitionKey(groupId, topicIdPartition);
     }
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 0c7b488f180..f1f708f9dd9 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.server.purgatory.DelayedOperationKey;
 import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
 import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.server.storage.log.FetchParams;
 import org.apache.kafka.server.storage.log.FetchPartitionData;
@@ -113,13 +113,13 @@ public class DelayedShareFetchTest {
         sharePartitions.put(tp0, sp0);
         sharePartitions.put(tp1, sp1);
 
-        ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, 
Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
 
         when(sp0.canAcquireRecords()).thenReturn(false);
         when(sp1.canAcquireRecords()).thenReturn(false);
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions)
             .build());
 
@@ -150,7 +150,7 @@ public class DelayedShareFetchTest {
         sharePartitions.put(tp0, sp0);
         sharePartitions.put(tp1, sp1);
 
-        ShareFetchData shareFetchData = new ShareFetchData(
+        ShareFetch shareFetch = new ShareFetch(
             new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                 2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
@@ -172,7 +172,7 @@ public class DelayedShareFetchTest {
         doAnswer(invocation -> 
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
 
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions)
             .withReplicaManager(replicaManager)
             .build());
@@ -205,7 +205,7 @@ public class DelayedShareFetchTest {
         sharePartitions.put(tp0, sp0);
         sharePartitions.put(tp1, sp1);
 
-        ShareFetchData shareFetchData = new ShareFetchData(
+        ShareFetch shareFetch = new ShareFetch(
             new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                 2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
@@ -223,7 +223,7 @@ public class DelayedShareFetchTest {
         mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
 
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions)
             .withReplicaManager(replicaManager)
             .build());
@@ -256,7 +256,7 @@ public class DelayedShareFetchTest {
         sharePartitions.put(tp0, sp0);
         sharePartitions.put(tp1, sp1);
 
-        ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, 
Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
 
         when(sp0.canAcquireRecords()).thenReturn(true);
@@ -268,7 +268,7 @@ public class DelayedShareFetchTest {
         when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
         mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 
1);
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions)
             .withReplicaManager(replicaManager)
             .build());
@@ -301,13 +301,14 @@ public class DelayedShareFetchTest {
         sharePartitions.put(tp0, sp0);
         sharePartitions.put(tp1, sp1);
 
-        ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
-            new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, 
Uuid.randomUuid().toString(),
+            future, partitionMaxBytes, MAX_FETCH_RECORDS);
 
         when(sp0.canAcquireRecords()).thenReturn(false);
         when(sp1.canAcquireRecords()).thenReturn(false);
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withReplicaManager(replicaManager)
             .withSharePartitions(sharePartitions)
             .build());
@@ -315,7 +316,7 @@ public class DelayedShareFetchTest {
         delayedShareFetch.forceComplete();
 
         // Since no partition could be acquired, the future should be empty 
and replicaManager.readFromLog should not be called.
-        assertEquals(0, shareFetchData.future().join().size());
+        assertEquals(0, future.join().size());
         Mockito.verify(replicaManager, times(0)).readFromLog(
                 any(), any(), any(ReplicaQuota.class), anyBoolean());
         assertTrue(delayedShareFetch.isCompleted());
@@ -343,7 +344,7 @@ public class DelayedShareFetchTest {
         sharePartitions.put(tp0, sp0);
         sharePartitions.put(tp1, sp1);
 
-        ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, 
Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
 
         when(sp0.canAcquireRecords()).thenReturn(true);
@@ -352,7 +353,7 @@ public class DelayedShareFetchTest {
             ShareAcquiredRecords.fromAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
         doAnswer(invocation -> 
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withReplicaManager(replicaManager)
             .withSharePartitions(sharePartitions)
             .build());
@@ -365,7 +366,7 @@ public class DelayedShareFetchTest {
         Mockito.verify(sp0, times(1)).nextFetchOffset();
         Mockito.verify(sp1, times(0)).nextFetchOffset();
         assertTrue(delayedShareFetch.isCompleted());
-        assertTrue(shareFetchData.future().isDone());
+        assertTrue(shareFetch.isCompleted());
         Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
     }
 
@@ -384,14 +385,14 @@ public class DelayedShareFetchTest {
         sharePartitions.put(tp0, sp0);
 
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
-        ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, 
Uuid.randomUuid().toString(),
             future, partitionMaxBytes, MAX_FETCH_RECORDS);
 
         when(sp0.maybeAcquireFetchLock()).thenReturn(true);
         when(sp0.canAcquireRecords()).thenReturn(false);
 
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withReplicaManager(replicaManager)
             .withSharePartitions(sharePartitions)
             .build());
@@ -402,7 +403,7 @@ public class DelayedShareFetchTest {
         assertTrue(delayedShareFetch.isCompleted());
         // Verifying that the first forceComplete calls acquirablePartitions 
method in DelayedShareFetch.
         Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
-        assertEquals(0, shareFetchData.future().join().size());
+        assertEquals(0, future.join().size());
 
         // Force completing the share fetch request for the second time should 
hit the future completion check and not
         // proceed ahead in the function.
@@ -438,7 +439,7 @@ public class DelayedShareFetchTest {
         sharePartitions1.put(tp1, sp1);
         sharePartitions1.put(tp2, sp2);
 
-        ShareFetchData shareFetchData1 = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
+        ShareFetch shareFetch1 = new ShareFetch(FETCH_PARAMS, groupId, 
Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes1, MAX_FETCH_RECORDS);
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
@@ -450,7 +451,7 @@ public class DelayedShareFetchTest {
         partitionMaxBytes1.keySet().forEach(topicIdPartition -> 
delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, 
topicIdPartition.topicId(), topicIdPartition.partition())));
 
         DelayedShareFetch delayedShareFetch1 = 
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData1)
+            .withShareFetchData(shareFetch1)
             .withReplicaManager(replicaManager)
             .withSharePartitions(sharePartitions1)
             .build();
@@ -460,12 +461,12 @@ public class DelayedShareFetchTest {
         delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch1, 
delayedShareFetchWatchKeys);
 
         assertEquals(2, delayedShareFetchPurgatory.watched());
-        assertFalse(shareFetchData1.future().isDone());
+        assertFalse(shareFetch1.isCompleted());
 
         Map<TopicIdPartition, Integer> partitionMaxBytes2 = new HashMap<>();
         partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES);
         partitionMaxBytes2.put(tp2, PARTITION_MAX_BYTES);
-        ShareFetchData shareFetchData2 = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
+        ShareFetch shareFetch2 = new ShareFetch(FETCH_PARAMS, groupId, 
Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes2, MAX_FETCH_RECORDS);
 
         doAnswer(invocation -> 
buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
@@ -476,7 +477,7 @@ public class DelayedShareFetchTest {
         sharePartitions2.put(tp2, sp2);
 
         DelayedShareFetch delayedShareFetch2 = 
spy(DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData2)
+            .withShareFetchData(shareFetch2)
             .withReplicaManager(replicaManager)
             .withSharePartitions(sharePartitions2)
             .build());
@@ -491,7 +492,7 @@ public class DelayedShareFetchTest {
         // requests, it should add a "check and complete" action for request 
key tp1 on the purgatory.
         delayedShareFetch2.forceComplete();
         assertTrue(delayedShareFetch2.isCompleted());
-        assertTrue(shareFetchData2.future().isDone());
+        assertTrue(shareFetch2.isCompleted());
         Mockito.verify(replicaManager, times(1)).readFromLog(
             any(), any(), any(ReplicaQuota.class), anyBoolean());
         assertFalse(delayedShareFetch1.isCompleted());
@@ -518,13 +519,13 @@ public class DelayedShareFetchTest {
         sharePartitions.put(tp1, sp1);
 
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
-        ShareFetchData shareFetchData = new ShareFetchData(
+        ShareFetch shareFetch = new ShareFetch(
             new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
             future, partitionMaxBytes, MAX_FETCH_RECORDS);
 
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withReplicaManager(replicaManager)
             .withSharePartitions(sharePartitions)
             .build();
@@ -568,7 +569,7 @@ public class DelayedShareFetchTest {
         LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
         sharePartitions.put(tp0, sp0);
 
-        ShareFetchData shareFetchData = new ShareFetchData(
+        ShareFetch shareFetch = new ShareFetch(
             new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
@@ -583,20 +584,35 @@ public class DelayedShareFetchTest {
         
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(partition);
         when(partition.fetchOffsetSnapshot(any(), anyBoolean())).thenThrow(new 
RuntimeException("Exception thrown"));
 
+        SharePartitionManager sharePartitionManager = 
mock(SharePartitionManager.class);
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions)
             .withReplicaManager(replicaManager)
+            .withSharePartitionManager(sharePartitionManager)
             .build());
+
+        // Try complete should return false as the share partition has errored 
out.
+        assertFalse(delayedShareFetch.tryComplete());
+        // Fetch should remain pending and should be completed on request 
timeout.
         assertFalse(delayedShareFetch.isCompleted());
+        // The request should be errored out as topic partition should get 
added as erroneous.
+        assertTrue(shareFetch.errorInAllPartitions());
 
-        // Since minBytes calculation throws an exception and returns true, 
tryComplete should return true.
-        assertTrue(delayedShareFetch.tryComplete());
+        Mockito.verify(sharePartitionManager, 
times(1)).handleFencedSharePartitionException(any(), any());
+        Mockito.verify(replicaManager, times(1)).readFromLog(
+            any(), any(), any(ReplicaQuota.class), anyBoolean());
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
+
+        // Force complete the request as it's still pending. Return false from 
the share partition lock acquire.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(false);
+        assertTrue(delayedShareFetch.forceComplete());
         assertTrue(delayedShareFetch.isCompleted());
-        Mockito.verify(replicaManager, times(2)).readFromLog(
+
+        // Read from log and release partition locks should not be called as 
the request is errored out.
+        Mockito.verify(replicaManager, times(1)).readFromLog(
             any(), any(), any(ReplicaQuota.class), anyBoolean());
-        // releasePartitionLocks will be called twice, once from tryComplete 
and then from onComplete.
-        Mockito.verify(delayedShareFetch, 
times(2)).releasePartitionLocks(any());
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
     }
 
     @Test
@@ -615,11 +631,11 @@ public class DelayedShareFetchTest {
         doAnswer(invocation -> 
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
         mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 
1);
 
-        ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, 
Uuid.randomUuid().toString(),
             new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES), 
MAX_FETCH_RECORDS);
 
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions1)
             .withReplicaManager(replicaManager)
             .build();
@@ -643,11 +659,11 @@ public class DelayedShareFetchTest {
         LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
         sharePartitions.put(tp0, sp0);
 
-        ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, 
Uuid.randomUuid().toString(),
             new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES), 
MAX_FETCH_RECORDS);
 
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions)
             .build();
 
@@ -675,13 +691,13 @@ public class DelayedShareFetchTest {
     }
 
     static class DelayedShareFetchBuilder {
-        ShareFetchData shareFetchData = mock(ShareFetchData.class);
+        ShareFetch shareFetch = mock(ShareFetch.class);
         private ReplicaManager replicaManager = mock(ReplicaManager.class);
-        private final SharePartitionManager sharePartitionManager = 
mock(SharePartitionManager.class);
+        private SharePartitionManager sharePartitionManager = 
mock(SharePartitionManager.class);
         private LinkedHashMap<TopicIdPartition, SharePartition> 
sharePartitions = mock(LinkedHashMap.class);
 
-        DelayedShareFetchBuilder withShareFetchData(ShareFetchData 
shareFetchData) {
-            this.shareFetchData = shareFetchData;
+        DelayedShareFetchBuilder withShareFetchData(ShareFetch shareFetch) {
+            this.shareFetch = shareFetch;
             return this;
         }
 
@@ -690,6 +706,11 @@ public class DelayedShareFetchTest {
             return this;
         }
 
+        DelayedShareFetchBuilder 
withSharePartitionManager(SharePartitionManager sharePartitionManager) {
+            this.sharePartitionManager = sharePartitionManager;
+            return this;
+        }
+
         DelayedShareFetchBuilder 
withSharePartitions(LinkedHashMap<TopicIdPartition, SharePartition> 
sharePartitions) {
             this.sharePartitions = sharePartitions;
             return this;
@@ -701,7 +722,7 @@ public class DelayedShareFetchTest {
 
         public DelayedShareFetch build() {
             return new DelayedShareFetch(
-                shareFetchData,
+                shareFetch,
                 replicaManager,
                 sharePartitionManager,
                 sharePartitions);
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java 
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 6ff0f90bc49..f8a53e9aa33 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
 import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.server.storage.log.FetchParams;
 import org.apache.kafka.server.storage.log.FetchPartitionData;
@@ -101,7 +101,7 @@ public class ShareFetchUtilsTest {
         sharePartitions.put(tp0, sp0);
         sharePartitions.put(tp1, sp1);
 
-        ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, memberId,
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId,
             new CompletableFuture<>(), partitionMaxBytes, 100);
 
         MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
@@ -124,7 +124,7 @@ public class ShareFetchUtilsTest {
                 records1, Optional.empty(), OptionalLong.empty(), 
Optional.empty(),
                 OptionalInt.empty(), false));
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData 
=
-                ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData, sharePartitions, mock(ReplicaManager.class));
+                ShareFetchUtils.processFetchResponse(shareFetch, responseData, 
sharePartitions, mock(ReplicaManager.class));
 
         assertEquals(2, resultData.size());
         assertTrue(resultData.containsKey(tp0));
@@ -167,7 +167,7 @@ public class ShareFetchUtilsTest {
         sharePartitions.put(tp0, sp0);
         sharePartitions.put(tp1, sp1);
 
-        ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, memberId,
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId,
             new CompletableFuture<>(), partitionMaxBytes, 100);
 
         Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
@@ -178,7 +178,7 @@ public class ShareFetchUtilsTest {
             MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), 
Optional.empty(),
             OptionalInt.empty(), false));
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData 
=
-            ShareFetchUtils.processFetchResponse(shareFetchData, responseData, 
sharePartitions, mock(ReplicaManager.class));
+            ShareFetchUtils.processFetchResponse(shareFetch, responseData, 
sharePartitions, mock(ReplicaManager.class));
 
         assertEquals(2, resultData.size());
         assertTrue(resultData.containsKey(tp0));
@@ -209,7 +209,7 @@ public class ShareFetchUtilsTest {
         sharePartitions.put(tp0, sp0);
         sharePartitions.put(tp1, sp1);
 
-        ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, 
Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, 100);
 
         ReplicaManager replicaManager = mock(ReplicaManager.class);
@@ -247,7 +247,7 @@ public class ShareFetchUtilsTest {
             records1, Optional.empty(), OptionalLong.empty(), Optional.empty(),
             OptionalInt.empty(), false));
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
resultData1 =
-            ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData1, sharePartitions, replicaManager);
+            ShareFetchUtils.processFetchResponse(shareFetch, responseData1, 
sharePartitions, replicaManager);
 
         assertEquals(2, resultData1.size());
         assertTrue(resultData1.containsKey(tp0));
@@ -276,7 +276,7 @@ public class ShareFetchUtilsTest {
             MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), 
Optional.empty(),
             OptionalInt.empty(), false));
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
resultData2 =
-            ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData2, sharePartitions, replicaManager);
+            ShareFetchUtils.processFetchResponse(shareFetch, responseData2, 
sharePartitions, replicaManager);
 
         assertEquals(2, resultData2.size());
         assertTrue(resultData2.containsKey(tp0));
@@ -303,7 +303,7 @@ public class ShareFetchUtilsTest {
         LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
         sharePartitions.put(tp0, sp0);
 
-        ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, 
Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, 100);
 
         ReplicaManager replicaManager = mock(ReplicaManager.class);
@@ -327,7 +327,7 @@ public class ShareFetchUtilsTest {
                 OptionalInt.empty(), false));
 
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData 
=
-            ShareFetchUtils.processFetchResponse(shareFetchData, responseData, 
sharePartitions, replicaManager);
+            ShareFetchUtils.processFetchResponse(shareFetch, responseData, 
sharePartitions, replicaManager);
 
         assertEquals(1, resultData.size());
         assertTrue(resultData.containsKey(tp0));
@@ -342,7 +342,7 @@ public class ShareFetchUtilsTest {
                 records, Optional.empty(), OptionalLong.empty(), 
Optional.empty(),
                 OptionalInt.empty(), false));
 
-        resultData = ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData, sharePartitions, replicaManager);
+        resultData = ShareFetchUtils.processFetchResponse(shareFetch, 
responseData, sharePartitions, replicaManager);
 
         assertEquals(1, resultData.size());
         assertTrue(resultData.containsKey(tp0));
@@ -376,7 +376,7 @@ public class ShareFetchUtilsTest {
 
         Uuid memberId = Uuid.randomUuid();
         // Set max fetch records to 10
-        ShareFetchData shareFetchData = new ShareFetchData(
+        ShareFetch shareFetch = new ShareFetch(
             new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
                 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()),
             groupId, memberId.toString(), new CompletableFuture<>(), 
partitionMaxBytes, 10);
@@ -413,7 +413,7 @@ public class ShareFetchUtilsTest {
         responseData1.put(tp1, fetchPartitionData2);
 
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
resultData1 =
-            ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData1, sharePartitions, replicaManager);
+            ShareFetchUtils.processFetchResponse(shareFetch, responseData1, 
sharePartitions, replicaManager);
 
         assertEquals(2, resultData1.size());
         assertTrue(resultData1.containsKey(tp0));
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 46abf04b0a6..fbbccadff8b 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -65,7 +65,7 @@ import 
org.apache.kafka.server.share.context.ShareSessionContext;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
 import org.apache.kafka.server.share.persister.NoOpShareStatePersister;
 import org.apache.kafka.server.share.persister.Persister;
 import org.apache.kafka.server.share.session.ShareSession;
@@ -223,7 +223,7 @@ public class SharePartitionManagerTest {
 
         ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.FINAL_EPOCH);
 
-        // shareFetchData is not empty, but the maxBytes of topic partition is 
0, which means this is added only for acknowledgements.
+        // shareFetch is not empty, but the maxBytes of topic partition is 0, 
which means this is added only for acknowledgements.
         // New context should be created successfully
         Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData3 = 
Collections.singletonMap(new TopicIdPartition(tpId1, new TopicPartition("foo", 
0)),
                 new ShareFetchRequest.SharePartitionData(tpId1, 0));
@@ -257,7 +257,7 @@ public class SharePartitionManagerTest {
 
         ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.FINAL_EPOCH);
 
-        // shareFetchData is not empty and the maxBytes of topic partition is 
not 0, which means this is trying to fetch on a Final request.
+        // shareFetch is not empty and the maxBytes of topic partition is not 
0, which means this is trying to fetch on a Final request.
         // New context should throw an error
         Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData3 = 
Collections.singletonMap(new TopicIdPartition(tpId1, new TopicPartition("foo", 
0)),
                 new ShareFetchRequest.SharePartitionData(tpId1, 
PARTITION_MAX_BYTES));
@@ -1665,7 +1665,7 @@ public class SharePartitionManagerTest {
         partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
         partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
 
-        ShareFetchData shareFetchData = new ShareFetchData(
+        ShareFetch shareFetch = new ShareFetch(
                 FETCH_PARAMS,
                 groupId,
                 Uuid.randomUuid().toString(),
@@ -1700,7 +1700,7 @@ public class SharePartitionManagerTest {
         sharePartitions.put(tp2, sp2);
 
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withReplicaManager(mockReplicaManager)
             .withSharePartitions(sharePartitions)
             .build();
@@ -1765,7 +1765,7 @@ public class SharePartitionManagerTest {
         partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
         partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
 
-        ShareFetchData shareFetchData = new ShareFetchData(
+        ShareFetch shareFetch = new ShareFetch(
                 FETCH_PARAMS,
                 groupId,
                 Uuid.randomUuid().toString(),
@@ -1801,7 +1801,7 @@ public class SharePartitionManagerTest {
         sharePartitions.put(tp3, sp3);
 
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withReplicaManager(mockReplicaManager)
             .withSharePartitions(sharePartitions)
             .build();
@@ -1861,7 +1861,7 @@ public class SharePartitionManagerTest {
         partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
         partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
 
-        ShareFetchData shareFetchData = new ShareFetchData(
+        ShareFetch shareFetch = new ShareFetch(
                 FETCH_PARAMS,
                 groupId,
                 Uuid.randomUuid().toString(),
@@ -1897,7 +1897,7 @@ public class SharePartitionManagerTest {
         sharePartitions.put(tp2, sp2);
 
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withReplicaManager(mockReplicaManager)
             .withSharePartitions(sharePartitions)
             .build();
@@ -1965,7 +1965,7 @@ public class SharePartitionManagerTest {
         partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
         partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
 
-        ShareFetchData shareFetchData = new ShareFetchData(
+        ShareFetch shareFetch = new ShareFetch(
                 FETCH_PARAMS,
                 groupId,
                 Uuid.randomUuid().toString(),
@@ -2002,7 +2002,7 @@ public class SharePartitionManagerTest {
         sharePartitions.put(tp3, sp3);
 
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
-            .withShareFetchData(shareFetchData)
+            .withShareFetchData(shareFetch)
             .withReplicaManager(mockReplicaManager)
             .withSharePartitions(sharePartitions)
             .build();
@@ -2063,10 +2063,74 @@ public class SharePartitionManagerTest {
         // Verify that replica manager fetch is not called.
         Mockito.verify(mockReplicaManager, times(0)).readFromLog(
             any(), any(), any(ReplicaQuota.class), anyBoolean());
+        assertFalse(pendingInitializationFuture.isDone());
         // Complete the pending initialization future.
         pendingInitializationFuture.complete(null);
     }
 
+    @Test
+    public void testDelayedInitializationShouldCompleteFetchRequest() throws 
Exception {
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+        Uuid fooId = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(fooId, new 
TopicPartition("foo", 0));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = 
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+        SharePartition sp0 = mock(SharePartition.class);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+
+        // Keep the 2 initialization futures pending and 1 completed with 
leader not available exception.
+        CompletableFuture<Void> pendingInitializationFuture1 = new 
CompletableFuture<>();
+        CompletableFuture<Void> pendingInitializationFuture2 = new 
CompletableFuture<>();
+        when(sp0.maybeInitialize()).
+            thenReturn(pendingInitializationFuture1)
+            .thenReturn(pendingInitializationFuture2)
+            .thenReturn(CompletableFuture.failedFuture(new 
LeaderNotAvailableException("Leader not available")));
+
+        DelayedOperationPurgatory<DelayedShareFetch> shareFetchPurgatorySpy = 
spy(new DelayedOperationPurgatory<>(
+            "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true));
+        mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
shareFetchPurgatorySpy);
+
+        SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+            
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(mockReplicaManager).withTimer(mockTimer)
+            .build();
+
+        // Send 3 requests for share fetch for same share partition.
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future1 =
+            sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, partitionMaxBytes);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future2 =
+            sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, partitionMaxBytes);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future3 =
+            sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, partitionMaxBytes);
+
+        Mockito.verify(sp0, times(3)).maybeInitialize();
+        Mockito.verify(mockReplicaManager, 
times(3)).addDelayedShareFetchRequest(any(), any());
+        Mockito.verify(shareFetchPurgatorySpy, 
times(3)).tryCompleteElseWatch(any(), any());
+        Mockito.verify(shareFetchPurgatorySpy, 
times(0)).checkAndComplete(any());
+
+        // All 3 requests should be pending.
+        assertFalse(future1.isDone());
+        assertFalse(future2.isDone());
+        assertFalse(future3.isDone());
+
+        // Complete one pending initialization future.
+        pendingInitializationFuture1.complete(null);
+        Mockito.verify(mockReplicaManager, 
times(1)).completeDelayedShareFetchRequest(any());
+        Mockito.verify(shareFetchPurgatorySpy, 
times(1)).checkAndComplete(any());
+
+        pendingInitializationFuture2.complete(null);
+        Mockito.verify(mockReplicaManager, 
times(2)).completeDelayedShareFetchRequest(any());
+        Mockito.verify(shareFetchPurgatorySpy, 
times(2)).checkAndComplete(any());
+
+        // Verify that replica manager fetch is not called.
+        Mockito.verify(mockReplicaManager, times(0)).readFromLog(
+            any(), any(), any(ReplicaQuota.class), anyBoolean());
+    }
+
     @Test
     public void testSharePartitionInitializationExceptions() throws Exception {
         String groupId = "grp";
@@ -2100,6 +2164,7 @@ public class SharePartitionManagerTest {
         // between SharePartitionManager and SharePartition to retry the 
request as SharePartition is not yet ready.
         assertFalse(future.isCompletedExceptionally());
         assertTrue(future.join().isEmpty());
+        Mockito.verify(sp0, times(0)).markFenced();
         // Verify that the share partition is still in the cache on 
LeaderNotAvailableException.
         assertEquals(1, partitionCacheMap.size());
 
@@ -2111,6 +2176,7 @@ public class SharePartitionManagerTest {
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
             () -> "Processing in delayed share fetch queue never ended.");
         validateShareFetchFutureException(future, tp0, 
Errors.UNKNOWN_SERVER_ERROR, "Illegal state");
+        Mockito.verify(sp0, times(1)).markFenced();
         assertTrue(partitionCacheMap.isEmpty());
 
         // The last exception removes the share partition from the cache hence 
re-add the share partition to cache.
@@ -2123,6 +2189,7 @@ public class SharePartitionManagerTest {
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
             () -> "Processing in delayed share fetch queue never ended.");
         validateShareFetchFutureException(future, tp0, 
Errors.COORDINATOR_NOT_AVAILABLE, "Coordinator not available");
+        Mockito.verify(sp0, times(2)).markFenced();
         assertTrue(partitionCacheMap.isEmpty());
 
         // The last exception removes the share partition from the cache hence 
re-add the share partition to cache.
@@ -2135,6 +2202,7 @@ public class SharePartitionManagerTest {
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
             () -> "Processing in delayed share fetch queue never ended.");
         validateShareFetchFutureException(future, tp0, Errors.INVALID_REQUEST, 
"Invalid request");
+        Mockito.verify(sp0, times(3)).markFenced();
         assertTrue(partitionCacheMap.isEmpty());
 
         // The last exception removes the share partition from the cache hence 
re-add the share partition to cache.
@@ -2147,6 +2215,7 @@ public class SharePartitionManagerTest {
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
             () -> "Processing in delayed share fetch queue never ended.");
         validateShareFetchFutureException(future, tp0, 
Errors.FENCED_STATE_EPOCH, "Fenced state epoch");
+        Mockito.verify(sp0, times(4)).markFenced();
         assertTrue(partitionCacheMap.isEmpty());
 
         // The last exception removes the share partition from the cache hence 
re-add the share partition to cache.
@@ -2159,6 +2228,7 @@ public class SharePartitionManagerTest {
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
             () -> "Processing in delayed share fetch queue never ended.");
         validateShareFetchFutureException(future, tp0, 
Errors.NOT_LEADER_OR_FOLLOWER, "Not leader or follower");
+        Mockito.verify(sp0, times(5)).markFenced();
         assertTrue(partitionCacheMap.isEmpty());
 
         // The last exception removes the share partition from the cache hence 
re-add the share partition to cache.
@@ -2171,6 +2241,7 @@ public class SharePartitionManagerTest {
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
             () -> "Processing in delayed share fetch queue never ended.");
         validateShareFetchFutureException(future, tp0, 
Errors.UNKNOWN_SERVER_ERROR, "Runtime exception");
+        Mockito.verify(sp0, times(6)).markFenced();
         assertTrue(partitionCacheMap.isEmpty());
     }
 
@@ -2247,18 +2318,25 @@ public class SharePartitionManagerTest {
     public void testSharePartitionPartialInitializationFailure() throws 
Exception {
         String groupId = "grp";
         Uuid memberId1 = Uuid.randomUuid();
+        // For tp0, share partition instantiation will fail.
         TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        // For tp1, share fetch should succeed.
         TopicIdPartition tp1 = new TopicIdPartition(memberId1, new 
TopicPartition("foo", 1));
-        Map<TopicIdPartition, Integer> partitionMaxBytes = Map.of(tp0, 
PARTITION_MAX_BYTES, tp1, PARTITION_MAX_BYTES);
-
-        // Mark partition1 as not the leader.
-        Partition partition1 = mock(Partition.class);
-        when(partition1.isLeader()).thenReturn(false);
-
+        // For tp2, share partition initialization will fail.
+        TopicIdPartition tp2 = new TopicIdPartition(memberId1, new 
TopicPartition("foo", 2));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = Map.of(
+            tp0, PARTITION_MAX_BYTES,
+            tp1, PARTITION_MAX_BYTES,
+            tp2, PARTITION_MAX_BYTES);
+
+        // Mark partition0 as not the leader.
+        Partition partition0 = mock(Partition.class);
+        when(partition0.isLeader()).thenReturn(false);
         ReplicaManager replicaManager = mock(ReplicaManager.class);
         when(replicaManager.getPartitionOrException(any()))
-            .thenReturn(partition1);
+            .thenReturn(partition0);
 
+        // Mock share partition for tp1, so it can succeed.
         SharePartition sp1 = mock(SharePartition.class);
         Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
         partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
@@ -2268,6 +2346,11 @@ public class SharePartitionManagerTest {
         
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
         when(sp1.acquire(anyString(), anyInt(), any())).thenReturn(new 
ShareAcquiredRecords(Collections.emptyList(), 0));
 
+        // Fail initialization for tp2.
+        SharePartition sp2 = mock(SharePartition.class);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
+        
when(sp2.maybeInitialize()).thenReturn(CompletableFuture.failedFuture(new 
FencedStateEpochException("Fenced state epoch")));
+
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
             "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
             DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
@@ -2289,11 +2372,16 @@ public class SharePartitionManagerTest {
         assertFalse(future.isCompletedExceptionally());
 
         Map<TopicIdPartition, PartitionData> partitionDataMap = future.get();
-        // For now only 1 successful partition is included, this will be fixed 
in subsequents PRs.
-        assertEquals(1, partitionDataMap.size());
+        assertEquals(3, partitionDataMap.size());
+        assertTrue(partitionDataMap.containsKey(tp0));
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code(), 
partitionDataMap.get(tp0).errorCode());
         assertTrue(partitionDataMap.containsKey(tp1));
         assertEquals(Errors.NONE.code(), 
partitionDataMap.get(tp1).errorCode());
+        assertTrue(partitionDataMap.containsKey(tp2));
+        assertEquals(Errors.FENCED_STATE_EPOCH.code(), 
partitionDataMap.get(tp2).errorCode());
+        assertEquals("Fenced state epoch", 
partitionDataMap.get(tp2).errorMessage());
 
+        Mockito.verify(replicaManager, 
times(0)).completeDelayedShareFetchRequest(any());
         Mockito.verify(replicaManager, times(1)).readFromLog(
             any(), any(), any(ReplicaQuota.class), anyBoolean());
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index 8097021e4cb..d2679cd62ea 100644
--- 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -16,6 +16,7 @@
  */
 package kafka.server
 
+import kafka.utils.TestUtils
 import org.apache.kafka.common.test.api.{ClusterConfigProperty, 
ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, 
ClusterTests, Type}
 import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords
 import org.apache.kafka.common.message.{ShareAcknowledgeRequestData, 
ShareAcknowledgeResponseData, ShareFetchRequestData, ShareFetchResponseData}
@@ -253,13 +254,26 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
     val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
-    val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
 
-    val shareFetchResponseData = shareFetchResponse.data()
-    assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
-    assertEquals(1, shareFetchResponseData.responses().size())
-    assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
-    assertEquals(3, 
shareFetchResponseData.responses().get(0).partitions().size())
+    // For the multi partition fetch request, the response may not be 
available in the first attempt
+    // as the share partitions might not be initialized yet. So, we retry 
until we get the response.
+    var responses = Seq[ShareFetchResponseData.PartitionData]()
+    TestUtils.waitUntilTrue(() => {
+      val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+      val shareFetchResponseData = shareFetchResponse.data()
+      assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+      assertEquals(1, shareFetchResponseData.responses().size())
+      val partitionsCount = 
shareFetchResponseData.responses().get(0).partitions().size()
+      if (partitionsCount > 0) {
+        assertEquals(topicId, 
shareFetchResponseData.responses().get(0).topicId())
+        
shareFetchResponseData.responses().get(0).partitions().foreach(partitionData => 
{
+          if (!partitionData.acquiredRecords().isEmpty) {
+            responses = responses :+ partitionData
+          }
+        })
+      }
+      responses.size == 3
+    }, "Share fetch request failed", 5000)
 
     val expectedPartitionData1 = new ShareFetchResponseData.PartitionData()
       .setPartitionIndex(0)
@@ -279,7 +293,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
       .setAcknowledgeErrorCode(Errors.NONE.code())
       
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), 
Collections.singletonList(9), Collections.singletonList(1)))
 
-    
shareFetchResponseData.responses().get(0).partitions().foreach(partitionData => 
{
+    responses.foreach(partitionData => {
       partitionData.partitionIndex() match {
         case 0 => compareFetchResponsePartitions(expectedPartitionData1, 
partitionData)
         case 1 => compareFetchResponsePartitions(expectedPartitionData2, 
partitionData)
@@ -2230,13 +2244,26 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
-    var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
 
-    var shareFetchResponseData = shareFetchResponse.data()
-    assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
-    assertEquals(1, shareFetchResponseData.responses().size())
-    assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
-    assertEquals(2, 
shareFetchResponseData.responses().get(0).partitions().size())
+    // For the multi partition fetch request, the response may not be 
available in the first attempt
+    // as the share partitions might not be initialized yet. So, we retry 
until we get the response.
+    var responses = Seq[ShareFetchResponseData.PartitionData]()
+    TestUtils.waitUntilTrue(() => {
+      val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+      val shareFetchResponseData = shareFetchResponse.data()
+      assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+      assertEquals(1, shareFetchResponseData.responses().size())
+      val partitionsCount = 
shareFetchResponseData.responses().get(0).partitions().size()
+      if (partitionsCount > 0) {
+        assertEquals(topicId, 
shareFetchResponseData.responses().get(0).topicId())
+        
shareFetchResponseData.responses().get(0).partitions().foreach(partitionData => 
{
+          if (!partitionData.acquiredRecords().isEmpty) {
+            responses = responses :+ partitionData
+          }
+        })
+      }
+      responses.size == 2
+    }, "Share fetch request failed", 5000)
 
     // Producing 10 more records to the topic partitions created above
     produceData(topicIdPartition1, 10)
@@ -2247,9 +2274,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, Seq.empty, forget, acknowledgementsMap)
-    shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+    val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
 
-    shareFetchResponseData = shareFetchResponse.data()
+    val shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
@@ -2265,10 +2292,25 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     compareFetchResponsePartitions(expectedPartitionData, partitionData)
   }
 
+  // For initial fetch request, the response may not be available in the first 
attempt when the share
+  // partition is not initialized yet. Hence, wait for response from all 
partitions before proceeding.
   private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, 
topicIdPartitions: Seq[TopicIdPartition]): Unit = {
-    val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
-    val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, topicIdPartitions, Seq.empty, Map.empty)
-    connectAndReceive[ShareFetchResponse](shareFetchRequest)
+    val partitions: util.Set[Integer] = new util.HashSet()
+    TestUtils.waitUntilTrue(() => {
+      val metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
+      val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, topicIdPartitions, Seq.empty, Map.empty)
+      val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+      val shareFetchResponseData = shareFetchResponse.data()
+
+      assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+      shareFetchResponseData.responses().foreach(response => {
+        if (!response.partitions().isEmpty) {
+          response.partitions().forEach(partitionData => 
partitions.add(partitionData.partitionIndex))
+        }
+      })
+
+      partitions.size() == topicIdPartitions.size
+    }, "Share fetch request failed", 5000)
   }
 
   private def expectedAcquiredRecords(firstOffsets: util.List[Long], 
lastOffsets: util.List[Long], deliveryCounts: util.List[Int]): 
util.List[AcquiredRecords] = {
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java 
b/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java
new file mode 100644
index 00000000000..7c60501ffd6
--- /dev/null
+++ b/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.fetch;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.server.storage.log.FetchParams;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * The ShareFetch class is used to store the fetch parameters for a share 
fetch request.
+ */
+public class ShareFetch {
+
+    /**
+     * The future that will be completed when the fetch is done.
+     */
+    private final CompletableFuture<Map<TopicIdPartition, PartitionData>> 
future;
+
+    /**
+     * The fetch parameters for the fetch request.
+     */
+    private final FetchParams fetchParams;
+    /**
+     * The group id of the share group that is fetching the records.
+     */
+    private final String groupId;
+    /**
+     * The member id of the share group that is fetching the records.
+     */
+    private final String memberId;
+    /**
+     * The maximum number of bytes that can be fetched for each partition.
+     */
+    private final Map<TopicIdPartition, Integer> partitionMaxBytes;
+    /**
+     * The maximum number of records that can be fetched for the request.
+     */
+    private final int maxFetchRecords;
+    /**
+     * The partitions that had an error during the fetch.
+     */
+    private Map<TopicIdPartition, Throwable> erroneous;
+
+    public ShareFetch(
+        FetchParams fetchParams,
+        String groupId,
+        String memberId,
+        CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
+        Map<TopicIdPartition, Integer> partitionMaxBytes,
+        int maxFetchRecords
+    ) {
+        this.fetchParams = fetchParams;
+        this.groupId = groupId;
+        this.memberId = memberId;
+        this.future = future;
+        this.partitionMaxBytes = partitionMaxBytes;
+        this.maxFetchRecords = maxFetchRecords;
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public String memberId() {
+        return memberId;
+    }
+
+    public Map<TopicIdPartition, Integer> partitionMaxBytes() {
+        return partitionMaxBytes;
+    }
+
+    public FetchParams fetchParams() {
+        return fetchParams;
+    }
+
+    public int maxFetchRecords() {
+        return maxFetchRecords;
+    }
+
+    /**
+     * Add an erroneous partition to the share fetch request. If the erroneous 
map is null, it will
+     * be created.
+     * <p>
+     * The method is synchronized to avoid concurrent modification of the 
erroneous map, as for
+     * some partitions the pending initialization can be on some threads and 
for other partitions
+     * share fetch request can be processed in purgatory.
+     *
+     * @param topicIdPartition The partition that had an error.
+     * @param throwable The error that occurred.
+     */
+    public synchronized void addErroneous(TopicIdPartition topicIdPartition, 
Throwable throwable) {
+        if (erroneous == null) {
+            erroneous = new HashMap<>();
+        }
+        erroneous.put(topicIdPartition, throwable);
+    }
+
+    /**
+     * Check if the share fetch request is completed.
+     * @return true if the request is completed, false otherwise.
+     */
+    public boolean isCompleted() {
+        return future.isDone();
+    }
+
+    /**
+     * Check if all the partitions in the request have errored.
+     * @return true if all the partitions in the request have errored, false 
otherwise.
+     */
+    public synchronized boolean errorInAllPartitions() {
+        return erroneous != null && erroneous.size() == 
partitionMaxBytes().size();
+    }
+
+    /**
+     * May be complete the share fetch request with the given partition data. 
If the request is already completed,
+     * this method does nothing. If there are any erroneous partitions, they 
will be added to the response.
+     *
+     * @param partitionData The partition data to complete the fetch with.
+     */
+    public void maybeComplete(Map<TopicIdPartition, PartitionData> 
partitionData) {
+        if (isCompleted()) {
+            return;
+        }
+
+        Map<TopicIdPartition, PartitionData> response = new 
HashMap<>(partitionData);
+        // Add any erroneous partitions to the response.
+        addErroneousToResponse(response);
+        future.complete(response);
+    }
+
+    /**
+     * Maybe complete the share fetch request with the given exception for the 
topicIdPartitions.
+     * If the request is already completed, this method does nothing. If there 
are any erroneous partitions,
+     * they will be added to the response.
+     *
+     * @param topicIdPartitions The topic id partitions which errored out.
+     * @param throwable The exception to complete the fetch with.
+     */
+    public void maybeCompleteWithException(Collection<TopicIdPartition> 
topicIdPartitions, Throwable throwable) {
+        if (isCompleted()) {
+            return;
+        }
+        Map<TopicIdPartition, PartitionData> response = 
topicIdPartitions.stream().collect(
+            Collectors.toMap(tp -> tp, tp -> new PartitionData()
+                .setErrorCode(Errors.forException(throwable).code())
+                .setErrorMessage(throwable.getMessage())));
+        // Add any erroneous partitions to the response.
+        addErroneousToResponse(response);
+        future.complete(response);
+    }
+
+    /**
+     * Filter out the erroneous partitions from the given set of 
topicIdPartitions. The order of
+     * partitions is important hence the method expects an ordered set as 
input and returns the ordered
+     * set as well.
+     *
+     * @param topicIdPartitions The topic id partitions to filter.
+     * @return The topic id partitions without the erroneous partitions.
+     */
+    public synchronized Set<TopicIdPartition> 
filterErroneousTopicPartitions(Set<TopicIdPartition> topicIdPartitions) {
+        if (erroneous != null) {
+            Set<TopicIdPartition> retain = new 
LinkedHashSet<>(topicIdPartitions);
+            retain.removeAll(erroneous.keySet());
+            return retain;
+        }
+        return topicIdPartitions;
+    }
+
+    private synchronized void addErroneousToResponse(Map<TopicIdPartition, 
PartitionData> response) {
+        if (erroneous != null) {
+            erroneous.forEach((topicIdPartition, throwable) -> {
+                response.put(topicIdPartition, new PartitionData()
+                    .setErrorCode(Errors.forException(throwable).code())
+                    .setErrorMessage(throwable.getMessage()));
+            });
+        }
+    }
+}
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java 
b/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java
deleted file mode 100644
index c32b3280017..00000000000
--- 
a/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.server.share.fetch;
-
-import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
-import org.apache.kafka.server.storage.log.FetchParams;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * The ShareFetchData class is used to store the fetch parameters for a share 
fetch request.
- */
-public class ShareFetchData {
-
-    private final FetchParams fetchParams;
-    private final String groupId;
-    private final String memberId;
-    private final CompletableFuture<Map<TopicIdPartition, PartitionData>> 
future;
-    private final Map<TopicIdPartition, Integer> partitionMaxBytes;
-    private final int maxFetchRecords;
-
-    public ShareFetchData(
-        FetchParams fetchParams,
-        String groupId,
-        String memberId,
-        CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
-        Map<TopicIdPartition, Integer> partitionMaxBytes,
-        int maxFetchRecords
-    ) {
-        this.fetchParams = fetchParams;
-        this.groupId = groupId;
-        this.memberId = memberId;
-        this.future = future;
-        this.partitionMaxBytes = partitionMaxBytes;
-        this.maxFetchRecords = maxFetchRecords;
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public String memberId() {
-        return memberId;
-    }
-
-    public CompletableFuture<Map<TopicIdPartition, PartitionData>> future() {
-        return future;
-    }
-
-    public Map<TopicIdPartition, Integer> partitionMaxBytes() {
-        return partitionMaxBytes;
-    }
-
-    public FetchParams fetchParams() {
-        return fetchParams;
-    }
-
-    public int maxFetchRecords() {
-        return maxFetchRecords;
-    }
-}
diff --git 
a/share/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java 
b/share/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java
new file mode 100644
index 00000000000..bf5de1ae0de
--- /dev/null
+++ 
b/share/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.storage.log.FetchParams;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class ShareFetchTest {
+
+    private static final String GROUP_ID = "groupId";
+    private static final String MEMBER_ID = "memberId";
+
+    @Test
+    public void testErrorInAllPartitions() {
+        TopicIdPartition topicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+        ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), 
GROUP_ID, MEMBER_ID, new CompletableFuture<>(),
+            Map.of(topicIdPartition, 10), 100);
+        assertFalse(shareFetch.errorInAllPartitions());
+
+        shareFetch.addErroneous(topicIdPartition, new RuntimeException());
+        assertTrue(shareFetch.errorInAllPartitions());
+    }
+
+    @Test
+    public void testErrorInAllPartitionsWithMultipleTopicIdPartitions() {
+        TopicIdPartition topicIdPartition0 = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+        TopicIdPartition topicIdPartition1 = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
+        ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), 
GROUP_ID, MEMBER_ID, new CompletableFuture<>(),
+            Map.of(topicIdPartition0, 10, topicIdPartition1, 10), 100);
+        assertFalse(shareFetch.errorInAllPartitions());
+
+        shareFetch.addErroneous(topicIdPartition0, new RuntimeException());
+        assertFalse(shareFetch.errorInAllPartitions());
+
+        shareFetch.addErroneous(topicIdPartition1, new RuntimeException());
+        assertTrue(shareFetch.errorInAllPartitions());
+    }
+
+    @Test
+    public void testFilterErroneousTopicPartitions() {
+        TopicIdPartition topicIdPartition0 = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+        TopicIdPartition topicIdPartition1 = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
+        ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), 
GROUP_ID, MEMBER_ID, new CompletableFuture<>(),
+            Map.of(topicIdPartition0, 10, topicIdPartition1, 10), 100);
+        Set<TopicIdPartition> result = 
shareFetch.filterErroneousTopicPartitions(Set.of(topicIdPartition0, 
topicIdPartition1));
+        // No erroneous partitions, hence all partitions should be returned.
+        assertEquals(2, result.size());
+        assertTrue(result.contains(topicIdPartition0));
+        assertTrue(result.contains(topicIdPartition1));
+
+        // Add an erroneous partition and verify that it is filtered out.
+        shareFetch.addErroneous(topicIdPartition0, new RuntimeException());
+        result = 
shareFetch.filterErroneousTopicPartitions(Set.of(topicIdPartition0, 
topicIdPartition1));
+        assertEquals(1, result.size());
+        assertTrue(result.contains(topicIdPartition1));
+
+        // Add another erroneous partition and verify that it is filtered out.
+        shareFetch.addErroneous(topicIdPartition1, new RuntimeException());
+        result = 
shareFetch.filterErroneousTopicPartitions(Set.of(topicIdPartition0, 
topicIdPartition1));
+        assertTrue(result.isEmpty());
+    }
+
+}

Reply via email to