This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6abbd548b86 MINOR Refactoring share fetch code (KIP-932) (#17269)
6abbd548b86 is described below

commit 6abbd548b86c61e8675bef8a4858144af21f4732
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Sep 26 13:09:31 2024 +0100

    MINOR Refactoring share fetch code (KIP-932) (#17269)
    
    Reviewers: Andrew Schofield <[email protected]>, David Arthur 
<[email protected]>
---
 build.gradle                                       |   1 +
 checkstyle/import-control-share.xml                |   4 +
 .../java/kafka/server/share/DelayedShareFetch.java |  57 ++++-----
 .../kafka/server/share/DelayedShareFetchKey.java   |   8 +-
 .../java/kafka/server/share/ShareFetchUtils.java   |  14 ++-
 .../java/kafka/server/share/SharePartition.java    |   2 +-
 .../kafka/server/share/SharePartitionManager.java  | 128 ++++-----------------
 core/src/main/scala/kafka/server/KafkaApis.scala   |   3 +-
 .../kafka/server/share/DelayedShareFetchTest.java  |  66 +++++------
 .../kafka/server/share/ShareFetchUtilsTest.java    |  34 +++---
 .../server/share/SharePartitionManagerTest.java    | 120 +++++++++----------
 .../kafka/server/share/SharePartitionTest.java     |   2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   3 +-
 .../kafka/server/share/SharePartitionKey.java      |  65 ++++++-----
 .../ShareAcknowledgementBatch.java                 |   2 +-
 .../kafka/server/share/fetch/ShareFetchData.java   |  71 ++++++++++++
 16 files changed, 297 insertions(+), 283 deletions(-)

diff --git a/build.gradle b/build.gradle
index 64b9d488365..01c8aae3caf 100644
--- a/build.gradle
+++ b/build.gradle
@@ -954,6 +954,7 @@ project(':share') {
 
   dependencies {
     implementation project(':server-common')
+    implementation project(':storage')
 
     implementation libs.slf4jApi
 
diff --git a/checkstyle/import-control-share.xml 
b/checkstyle/import-control-share.xml
index f0022d6b318..05ddfff90ac 100644
--- a/checkstyle/import-control-share.xml
+++ b/checkstyle/import-control-share.xml
@@ -43,6 +43,10 @@
   <subpackage name="server">
     <subpackage name="share">
       <allow pkg="org.apache.kafka.server.share" />
+
+      <subpackage name="fetch">
+        <allow class="org.apache.kafka.storage.internals.log.FetchParams"/>
+      </subpackage>
     </subpackage>
   </subpackage>
 
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index ef80047f222..35608082684 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -23,6 +23,8 @@ import kafka.server.ReplicaManager;
 
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.fetch.ShareFetchData;
 import org.apache.kafka.storage.internals.log.FetchPartitionData;
 
 import org.slf4j.Logger;
@@ -46,19 +48,20 @@ import scala.runtime.BoxedUnit;
  * A delayed share fetch operation has been introduced in case there is a 
share fetch request which cannot be completed instantaneously.
  */
 public class DelayedShareFetch extends DelayedOperation {
-    private final SharePartitionManager.ShareFetchPartitionData 
shareFetchPartitionData;
+
+    private final ShareFetchData shareFetchData;
     private final ReplicaManager replicaManager;
-    private final Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap;
+    private final Map<SharePartitionKey, SharePartition> partitionCacheMap;
     private Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionDataFromTryComplete = new LinkedHashMap<>();
 
     private static final Logger log = 
LoggerFactory.getLogger(DelayedShareFetch.class);
 
     DelayedShareFetch(
-            SharePartitionManager.ShareFetchPartitionData 
shareFetchPartitionData,
+            ShareFetchData shareFetchData,
             ReplicaManager replicaManager,
-            Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap) {
-        super(shareFetchPartitionData.fetchParams().maxWaitMs, Option.empty());
-        this.shareFetchPartitionData = shareFetchPartitionData;
+            Map<SharePartitionKey, SharePartition> partitionCacheMap) {
+        super(shareFetchData.fetchParams().maxWaitMs, Option.empty());
+        this.shareFetchData = shareFetchData;
         this.replicaManager = replicaManager;
         this.partitionCacheMap = partitionCacheMap;
     }
@@ -75,10 +78,10 @@ public class DelayedShareFetch extends DelayedOperation {
     @Override
     public void onComplete() {
         log.trace("Completing the delayed share fetch request for group {}, 
member {}, " +
-                        "topic partitions {}", 
shareFetchPartitionData.groupId(),
-                shareFetchPartitionData.memberId(), 
shareFetchPartitionData.partitionMaxBytes().keySet());
+                        "topic partitions {}", shareFetchData.groupId(),
+            shareFetchData.memberId(), 
shareFetchData.partitionMaxBytes().keySet());
 
-        if (shareFetchPartitionData.future().isDone())
+        if (shareFetchData.future().isDone())
             return;
 
         Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
@@ -91,14 +94,14 @@ public class DelayedShareFetch extends DelayedOperation {
         try {
             if (topicPartitionData.isEmpty()) {
                 // No locks for share partitions could be acquired, so we 
complete the request with an empty response.
-                
shareFetchPartitionData.future().complete(Collections.emptyMap());
+                shareFetchData.future().complete(Collections.emptyMap());
                 return;
             }
             log.trace("Fetchable share partitions data: {} with groupId: {} 
fetch params: {}",
-                    topicPartitionData, shareFetchPartitionData.groupId(), 
shareFetchPartitionData.fetchParams());
+                    topicPartitionData, shareFetchData.groupId(), 
shareFetchData.fetchParams());
 
             Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
-                shareFetchPartitionData.fetchParams(),
+                shareFetchData.fetchParams(),
                 CollectionConverters.asScala(
                     topicPartitionData.entrySet().stream().map(entry ->
                         new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
@@ -116,23 +119,23 @@ public class DelayedShareFetch extends DelayedOperation {
             });
 
             log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
-            ShareFetchUtils.processFetchResponse(shareFetchPartitionData, 
responseData, partitionCacheMap, replicaManager)
+            ShareFetchUtils.processFetchResponse(shareFetchData, responseData, 
partitionCacheMap, replicaManager)
                 .whenComplete((result, throwable) -> {
                     if (throwable != null) {
                         log.error("Error processing fetch response for share 
partitions", throwable);
-                        
shareFetchPartitionData.future().completeExceptionally(throwable);
+                        
shareFetchData.future().completeExceptionally(throwable);
                     } else {
-                        shareFetchPartitionData.future().complete(result);
+                        shareFetchData.future().complete(result);
                     }
                     // Releasing the lock to move ahead with the next request 
in queue.
-                    releasePartitionLocks(shareFetchPartitionData.groupId(), 
topicPartitionData.keySet());
+                    releasePartitionLocks(shareFetchData.groupId(), 
topicPartitionData.keySet());
                 });
 
         } catch (Exception e) {
             // Release the locks acquired for the partitions in the share 
fetch request in case there is an exception
             log.error("Error processing delayed share fetch request", e);
-            shareFetchPartitionData.future().completeExceptionally(e);
-            releasePartitionLocks(shareFetchPartitionData.groupId(), 
topicPartitionData.keySet());
+            shareFetchData.future().completeExceptionally(e);
+            releasePartitionLocks(shareFetchData.groupId(), 
topicPartitionData.keySet());
         }
     }
 
@@ -142,16 +145,16 @@ public class DelayedShareFetch extends DelayedOperation {
     @Override
     public boolean tryComplete() {
         log.trace("Try to complete the delayed share fetch request for group 
{}, member {}, topic partitions {}",
-                shareFetchPartitionData.groupId(), 
shareFetchPartitionData.memberId(),
-                shareFetchPartitionData.partitionMaxBytes().keySet());
+            shareFetchData.groupId(), shareFetchData.memberId(),
+            shareFetchData.partitionMaxBytes().keySet());
 
         topicPartitionDataFromTryComplete = acquirablePartitions();
 
         if (!topicPartitionDataFromTryComplete.isEmpty())
             return forceComplete();
         log.info("Can't acquire records for any partition in the share fetch 
request for group {}, member {}, " +
-                "topic partitions {}", shareFetchPartitionData.groupId(),
-                shareFetchPartitionData.memberId(), 
shareFetchPartitionData.partitionMaxBytes().keySet());
+                "topic partitions {}", shareFetchData.groupId(),
+                shareFetchData.memberId(), 
shareFetchData.partitionMaxBytes().keySet());
         return false;
     }
 
@@ -163,11 +166,11 @@ public class DelayedShareFetch extends DelayedOperation {
         // Initialize the topic partitions for which the fetch should be 
attempted.
         Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
new LinkedHashMap<>();
 
-        
shareFetchPartitionData.partitionMaxBytes().keySet().forEach(topicIdPartition 
-> {
-            SharePartition sharePartition = partitionCacheMap.get(new 
SharePartitionManager.SharePartitionKey(
-                    shareFetchPartitionData.groupId(), topicIdPartition));
+        shareFetchData.partitionMaxBytes().keySet().forEach(topicIdPartition 
-> {
+            SharePartition sharePartition = partitionCacheMap.get(new 
SharePartitionKey(
+                shareFetchData.groupId(), topicIdPartition));
 
-            int partitionMaxBytes = 
shareFetchPartitionData.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
+            int partitionMaxBytes = 
shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
             // Add the share partition to the list of partitions to be fetched 
only if we can
             // acquire the fetch lock on it.
             if (sharePartition.maybeAcquireFetchLock()) {
@@ -195,6 +198,6 @@ public class DelayedShareFetch extends DelayedOperation {
 
     private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
         topicIdPartitions.forEach(tp -> partitionCacheMap.get(new
-                SharePartitionManager.SharePartitionKey(groupId, 
tp)).releaseFetchLock());
+                SharePartitionKey(groupId, tp)).releaseFetchLock());
     }
 }
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java
index e4fe1a9af7f..a4c13f8f1a8 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java
@@ -19,19 +19,17 @@ package kafka.server.share;
 import kafka.server.DelayedOperationKey;
 
 import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.share.SharePartitionKey;
 
 import java.util.Objects;
 
 /**
  * A key for delayed operations that fetch data for share consumers.
  */
-public class DelayedShareFetchKey implements DelayedOperationKey {
-    private final String groupId;
-    private final TopicIdPartition topicIdPartition;
+public class DelayedShareFetchKey extends SharePartitionKey implements 
DelayedOperationKey {
 
     DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) {
-        this.groupId = groupId;
-        this.topicIdPartition = topicIdPartition;
+        super(groupId, topicIdPartition);
     }
 
     @Override
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java 
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index b16156a62bc..a8f8a04dbd8 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.fetch.ShareFetchData;
 import org.apache.kafka.storage.internals.log.FetchPartitionData;
 
 import org.slf4j.Logger;
@@ -45,20 +47,20 @@ public class ShareFetchUtils {
     // Process the replica manager fetch response to update share partitions 
and futures. We acquire the fetched data
     // from share partitions.
     static CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> processFetchResponse(
-            SharePartitionManager.ShareFetchPartitionData 
shareFetchPartitionData,
+            ShareFetchData shareFetchData,
             Map<TopicIdPartition, FetchPartitionData> responseData,
-            Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap,
+            Map<SharePartitionKey, SharePartition> partitionCacheMap,
             ReplicaManager replicaManager
     ) {
         Map<TopicIdPartition, 
CompletableFuture<ShareFetchResponseData.PartitionData>> futures = new 
HashMap<>();
         responseData.forEach((topicIdPartition, fetchPartitionData) -> {
 
-            SharePartition sharePartition = partitionCacheMap.get(new 
SharePartitionManager.SharePartitionKey(
-                    shareFetchPartitionData.groupId(), topicIdPartition));
-            futures.put(topicIdPartition, 
sharePartition.acquire(shareFetchPartitionData.memberId(), fetchPartitionData)
+            SharePartition sharePartition = partitionCacheMap.get(new 
SharePartitionKey(
+                shareFetchData.groupId(), topicIdPartition));
+            futures.put(topicIdPartition, 
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData)
                     .handle((acquiredRecords, throwable) -> {
                         log.trace("Acquired records for topicIdPartition: {} 
with share fetch data: {}, records: {}",
-                                topicIdPartition, shareFetchPartitionData, 
acquiredRecords);
+                                topicIdPartition, shareFetchData, 
acquiredRecords);
                         ShareFetchResponseData.PartitionData partitionData = 
new ShareFetchResponseData.PartitionData()
                                 
.setPartitionIndex(topicIdPartition.partition());
 
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index e8519939187..d2a8e783d7b 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -41,9 +41,9 @@ import org.apache.kafka.server.share.PartitionStateBatchData;
 import org.apache.kafka.server.share.Persister;
 import org.apache.kafka.server.share.PersisterStateBatch;
 import org.apache.kafka.server.share.ReadShareGroupStateParameters;
-import org.apache.kafka.server.share.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.TopicData;
 import org.apache.kafka.server.share.WriteShareGroupStateParameters;
+import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.util.timer.Timer;
 import org.apache.kafka.server.util.timer.TimerTask;
 import org.apache.kafka.storage.internals.log.FetchPartitionData;
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 3679f1c719a..28697a1441d 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -40,10 +40,12 @@ import 
org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.share.CachedSharePartition;
 import org.apache.kafka.server.share.Persister;
-import org.apache.kafka.server.share.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.context.FinalContext;
 import org.apache.kafka.server.share.context.ShareFetchContext;
 import org.apache.kafka.server.share.context.ShareSessionContext;
+import org.apache.kafka.server.share.fetch.ShareFetchData;
 import org.apache.kafka.server.share.session.ShareSession;
 import org.apache.kafka.server.share.session.ShareSessionCache;
 import org.apache.kafka.server.share.session.ShareSessionKey;
@@ -103,7 +105,7 @@ public class SharePartitionManager implements AutoCloseable 
{
     /**
      * The fetch queue stores the share fetch requests that are waiting to be 
processed.
      */
-    private final ConcurrentLinkedQueue<ShareFetchPartitionData> fetchQueue;
+    private final ConcurrentLinkedQueue<ShareFetchData> fetchQueue;
 
     /**
      * The process fetch queue lock is used to ensure that only one thread is 
processing the fetch queue at a time.
@@ -203,7 +205,7 @@ public class SharePartitionManager implements AutoCloseable 
{
             Time time,
             ShareSessionCache cache,
             Map<SharePartitionKey, SharePartition> partitionCacheMap,
-            ConcurrentLinkedQueue<ShareFetchPartitionData> fetchQueue,
+            ConcurrentLinkedQueue<ShareFetchData> fetchQueue,
             int recordLockDurationMs,
             Timer timer,
             int maxDeliveryCount,
@@ -248,8 +250,8 @@ public class SharePartitionManager implements AutoCloseable 
{
                 partitionMaxBytes.keySet(), groupId, fetchParams);
 
         CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new 
CompletableFuture<>();
-        ShareFetchPartitionData shareFetchPartitionData = new 
ShareFetchPartitionData(fetchParams, groupId, memberId, future, 
partitionMaxBytes);
-        fetchQueue.add(shareFetchPartitionData);
+        ShareFetchData shareFetchData = new ShareFetchData(fetchParams, 
groupId, memberId, future, partitionMaxBytes);
+        fetchQueue.add(shareFetchData);
         maybeProcessFetchQueue();
 
         return future;
@@ -518,8 +520,9 @@ public class SharePartitionManager implements AutoCloseable 
{
 
     // Add the share fetch request to the delayed share fetch purgatory to 
process the fetch request if it can be
     // completed else watch until it can be completed/timeout.
-    private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, 
Set<Object> keys) {
-        delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, 
CollectionConverters.asScala(keys).toSeq());
+    private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, 
Set<DelayedShareFetchKey> keys) {
+        delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch,
+            CollectionConverters.asScala(keys).toSeq().indices());
     }
 
     @Override
@@ -529,7 +532,7 @@ public class SharePartitionManager implements AutoCloseable 
{
         this.persister.stop();
         if (!fetchQueue.isEmpty()) {
             log.warn("Closing SharePartitionManager with pending fetch 
requests count: {}", fetchQueue.size());
-            fetchQueue.forEach(shareFetchPartitionData -> 
shareFetchPartitionData.future.completeExceptionally(
+            fetchQueue.forEach(shareFetchData -> 
shareFetchData.future().completeExceptionally(
                 Errors.BROKER_NOT_AVAILABLE.exception()));
             fetchQueue.clear();
         }
@@ -553,17 +556,17 @@ public class SharePartitionManager implements 
AutoCloseable {
             return;
         }
 
-        ShareFetchPartitionData shareFetchPartitionData = fetchQueue.poll();
-        if (shareFetchPartitionData == null) {
+        ShareFetchData shareFetchData = fetchQueue.poll();
+        if (shareFetchData == null) {
             // No more requests to process, so release the lock. Though we 
should not reach here as the lock
             // is acquired only when there are requests in the queue. But 
still, it's safe to release the lock.
             releaseProcessFetchQueueLock();
             return;
         }
 
-        if (shareFetchPartitionData.partitionMaxBytes.isEmpty()) {
+        if (shareFetchData.partitionMaxBytes().isEmpty()) {
             // If there are no partitions to fetch then complete the future 
with an empty map.
-            shareFetchPartitionData.future.complete(Collections.emptyMap());
+            shareFetchData.future().complete(Collections.emptyMap());
             // Release the lock so that other threads can process the queue.
             releaseProcessFetchQueueLock();
             if (!fetchQueue.isEmpty())
@@ -572,9 +575,9 @@ public class SharePartitionManager implements AutoCloseable 
{
         }
 
         try {
-            
shareFetchPartitionData.partitionMaxBytes.keySet().forEach(topicIdPartition -> {
+            
shareFetchData.partitionMaxBytes().keySet().forEach(topicIdPartition -> {
                 SharePartitionKey sharePartitionKey = sharePartitionKey(
-                    shareFetchPartitionData.groupId,
+                    shareFetchData.groupId(),
                     topicIdPartition
                 );
                 SharePartition sharePartition = 
fetchSharePartition(sharePartitionKey);
@@ -585,19 +588,19 @@ public class SharePartitionManager implements 
AutoCloseable {
                 // TopicPartitionData list will be populated only if the share 
partition is already initialized.
                 sharePartition.maybeInitialize().whenComplete((result, 
throwable) -> {
                     if (throwable != null) {
-                        
maybeCompleteInitializationWithException(sharePartitionKey, 
shareFetchPartitionData.future, throwable);
+                        
maybeCompleteInitializationWithException(sharePartitionKey, 
shareFetchData.future(), throwable);
                         return;
                     }
                 });
             });
 
-            Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
-            shareFetchPartitionData.partitionMaxBytes.keySet().forEach(
+            Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new 
HashSet<>();
+            shareFetchData.partitionMaxBytes().keySet().forEach(
                 topicIdPartition -> delayedShareFetchWatchKeys.add(
-                    new DelayedShareFetchKey(shareFetchPartitionData.groupId, 
topicIdPartition)));
+                    new DelayedShareFetchKey(shareFetchData.groupId(), 
topicIdPartition)));
 
             // Add the share fetch to the delayed share fetch purgatory to 
process the fetch request.
-            addDelayedShareFetch(new 
DelayedShareFetch(shareFetchPartitionData, replicaManager, partitionCacheMap),
+            addDelayedShareFetch(new DelayedShareFetch(shareFetchData, 
replicaManager, partitionCacheMap),
                 delayedShareFetchWatchKeys);
 
             // Release the lock so that other threads can process the queue.
@@ -621,8 +624,8 @@ public class SharePartitionManager implements AutoCloseable 
{
                 k -> {
                     long start = time.hiResClockMs();
                     SharePartition partition = new SharePartition(
-                            sharePartitionKey.groupId,
-                            sharePartitionKey.topicIdPartition,
+                            sharePartitionKey.groupId(),
+                            sharePartitionKey.topicIdPartition(),
                             maxInFlightMessages,
                             maxDeliveryCount,
                             recordLockDurationMs,
@@ -679,89 +682,6 @@ public class SharePartitionManager implements 
AutoCloseable {
         return new SharePartitionKey(groupId, topicIdPartition);
     }
 
-    /**
-     * The SharePartitionKey is used to uniquely identify a share partition. 
The key is made up of the
-     * share group id, the topic id and the partition id. The key is used to 
store the SharePartition
-     * objects in the partition cache map.
-     */
-    // Visible for testing
-    static class SharePartitionKey {
-        private final String groupId;
-        private final TopicIdPartition topicIdPartition;
-
-        public SharePartitionKey(String groupId, TopicIdPartition 
topicIdPartition) {
-            this.groupId = Objects.requireNonNull(groupId);
-            this.topicIdPartition = Objects.requireNonNull(topicIdPartition);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(groupId, topicIdPartition);
-        }
-
-        @Override
-        public boolean equals(final Object obj) {
-            if (this == obj)
-                return true;
-            else if (obj == null || getClass() != obj.getClass())
-                return false;
-            else {
-                SharePartitionKey that = (SharePartitionKey) obj;
-                return groupId.equals(that.groupId) && 
Objects.equals(topicIdPartition, that.topicIdPartition);
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "SharePartitionKey{" +
-                "groupId='" + groupId +
-                ", topicIdPartition=" + topicIdPartition +
-                '}';
-        }
-    }
-
-    /**
-     * The ShareFetchPartitionData class is used to store the fetch parameters 
for a share fetch request.
-     */
-    // Visible for testing
-    static class ShareFetchPartitionData {
-        private final FetchParams fetchParams;
-        private final String groupId;
-        private final String memberId;
-        private final CompletableFuture<Map<TopicIdPartition, PartitionData>> 
future;
-        private final Map<TopicIdPartition, Integer> partitionMaxBytes;
-
-        public ShareFetchPartitionData(FetchParams fetchParams, String 
groupId, String memberId,
-                                       CompletableFuture<Map<TopicIdPartition, 
PartitionData>> future,
-                                       Map<TopicIdPartition, Integer> 
partitionMaxBytes) {
-            this.fetchParams = fetchParams;
-            this.groupId = groupId;
-            this.memberId = memberId;
-            this.future = future;
-            this.partitionMaxBytes = partitionMaxBytes;
-        }
-
-        public String groupId() {
-            return groupId;
-        }
-
-        public String memberId() {
-            return memberId;
-        }
-
-        public CompletableFuture<Map<TopicIdPartition, PartitionData>> 
future() {
-            return future;
-        }
-
-        public Map<TopicIdPartition, Integer> partitionMaxBytes() {
-            return partitionMaxBytes;
-        }
-
-        public FetchParams fetchParams() {
-            return fetchParams;
-        }
-    }
-
     static class ShareGroupMetrics {
         /**
          * share-acknowledgement (share-acknowledgement-rate and 
share-acknowledgement-count) - The total number of offsets acknowledged for 
share groups (requests to be ack).
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 29f1b270ebc..2586c562cf5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -77,7 +77,8 @@ import org.apache.kafka.server.common.{GroupVersion, 
MetadataVersion, RequestLoc
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, 
IBP_2_3_IV0}
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.share.context.ShareFetchContext
-import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, 
ShareAcknowledgementBatch}
+import org.apache.kafka.server.share.ErroneousAndValidPartitionData
+import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, 
FetchParams, FetchPartitionData}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index cc490c0bd97..85f50c0f38a 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -25,6 +25,8 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.fetch.ShareFetchData;
 import org.apache.kafka.storage.internals.log.FetchIsolation;
 import org.apache.kafka.storage.internals.log.FetchParams;
 
@@ -68,11 +70,11 @@ public class DelayedShareFetchTest {
         when(sp0.maybeAcquireFetchLock()).thenReturn(true);
         when(sp1.maybeAcquireFetchLock()).thenReturn(true);
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new ConcurrentHashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
ConcurrentHashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
 
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData = new ShareFetchData(
                 new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
                 new CompletableFuture<>(), partitionMaxBytes);
@@ -80,7 +82,7 @@ public class DelayedShareFetchTest {
         when(sp0.canAcquireRecords()).thenReturn(false);
         when(sp1.canAcquireRecords()).thenReturn(false);
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
-            .withShareFetchPartitionData(shareFetchPartitionData)
+            .withShareFetchData(shareFetchData)
             .withPartitionCacheMap(partitionCacheMap)
             .build();
 
@@ -105,11 +107,11 @@ public class DelayedShareFetchTest {
         when(sp0.maybeAcquireFetchLock()).thenReturn(true);
         when(sp1.maybeAcquireFetchLock()).thenReturn(true);
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new ConcurrentHashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
ConcurrentHashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
 
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData = new ShareFetchData(
                 new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
                 new CompletableFuture<>(), partitionMaxBytes);
@@ -117,7 +119,7 @@ public class DelayedShareFetchTest {
         when(sp0.canAcquireRecords()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(false);
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
-            .withShareFetchPartitionData(shareFetchPartitionData)
+            .withShareFetchData(shareFetchData)
             .withPartitionCacheMap(partitionCacheMap)
             .build();
         assertFalse(delayedShareFetch.isCompleted());
@@ -144,11 +146,11 @@ public class DelayedShareFetchTest {
         when(sp0.maybeAcquireFetchLock()).thenReturn(true);
         when(sp1.maybeAcquireFetchLock()).thenReturn(true);
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new ConcurrentHashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
ConcurrentHashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
 
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData = new ShareFetchData(
                 new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
                 new CompletableFuture<>(), partitionMaxBytes);
@@ -156,7 +158,7 @@ public class DelayedShareFetchTest {
         when(sp0.canAcquireRecords()).thenReturn(false);
         when(sp1.canAcquireRecords()).thenReturn(false);
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
-            .withShareFetchPartitionData(shareFetchPartitionData)
+            .withShareFetchData(shareFetchData)
             .withReplicaManager(replicaManager)
             .withPartitionCacheMap(partitionCacheMap)
             .build();
@@ -164,7 +166,7 @@ public class DelayedShareFetchTest {
         delayedShareFetch.forceComplete();
 
         // Since no partition could be acquired, the future should be empty 
and replicaManager.readFromLog should not be called.
-        assertEquals(0, shareFetchPartitionData.future().join().size());
+        assertEquals(0, shareFetchData.future().join().size());
         Mockito.verify(replicaManager, times(0)).readFromLog(
                 any(), any(), any(ReplicaQuota.class), anyBoolean());
         assertTrue(delayedShareFetch.isCompleted());
@@ -187,11 +189,11 @@ public class DelayedShareFetchTest {
         when(sp0.maybeAcquireFetchLock()).thenReturn(true);
         when(sp1.maybeAcquireFetchLock()).thenReturn(true);
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new ConcurrentHashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
ConcurrentHashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
 
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData = new ShareFetchData(
                 new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
                 new CompletableFuture<>(), partitionMaxBytes);
@@ -199,7 +201,7 @@ public class DelayedShareFetchTest {
         when(sp0.canAcquireRecords()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(false);
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
-            .withShareFetchPartitionData(shareFetchPartitionData)
+            .withShareFetchData(shareFetchData)
             .withReplicaManager(replicaManager)
             .withPartitionCacheMap(partitionCacheMap)
             .build();
@@ -225,11 +227,11 @@ public class DelayedShareFetchTest {
 
         SharePartition sp0 = mock(SharePartition.class);
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new ConcurrentHashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
ConcurrentHashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
 
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData = new ShareFetchData(
                 new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
                 future, partitionMaxBytes);
@@ -238,7 +240,7 @@ public class DelayedShareFetchTest {
         when(sp0.canAcquireRecords()).thenReturn(false);
 
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
-            .withShareFetchPartitionData(shareFetchPartitionData)
+            .withShareFetchData(shareFetchData)
             .withReplicaManager(replicaManager)
             .withPartitionCacheMap(partitionCacheMap)
             .build());
@@ -249,7 +251,7 @@ public class DelayedShareFetchTest {
         assertTrue(delayedShareFetch.isCompleted());
         // Verifying that the first forceComplete calls acquirablePartitions 
method in DelayedShareFetch.
         Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
-        assertEquals(0, shareFetchPartitionData.future().join().size());
+        assertEquals(0, shareFetchData.future().join().size());
 
         // Force completing the share fetch request for the second time should 
hit the future completion check and not
         // proceed ahead in the function.
@@ -260,12 +262,12 @@ public class DelayedShareFetchTest {
     }
 
     static class DelayedShareFetchBuilder {
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= mock(SharePartitionManager.ShareFetchPartitionData.class);
+        ShareFetchData shareFetchData = mock(ShareFetchData.class);
         private ReplicaManager replicaManager = mock(ReplicaManager.class);
-        private Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
+        private Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
 
-        DelayedShareFetchBuilder 
withShareFetchPartitionData(SharePartitionManager.ShareFetchPartitionData 
shareFetchPartitionData) {
-            this.shareFetchPartitionData = shareFetchPartitionData;
+        DelayedShareFetchBuilder withShareFetchData(ShareFetchData 
shareFetchData) {
+            this.shareFetchData = shareFetchData;
             return this;
         }
 
@@ -274,7 +276,7 @@ public class DelayedShareFetchTest {
             return this;
         }
 
-        DelayedShareFetchBuilder 
withPartitionCacheMap(Map<SharePartitionManager.SharePartitionKey, 
SharePartition> partitionCacheMap) {
+        DelayedShareFetchBuilder withPartitionCacheMap(Map<SharePartitionKey, 
SharePartition> partitionCacheMap) {
             this.partitionCacheMap = partitionCacheMap;
             return this;
         }
@@ -285,7 +287,7 @@ public class DelayedShareFetchTest {
 
         public DelayedShareFetch build() {
             return new DelayedShareFetch(
-                    shareFetchPartitionData,
+                    shareFetchData,
                     replicaManager,
                     partitionCacheMap);
         }
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java 
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index dad92125b24..0f4d1c64ccb 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.fetch.ShareFetchData;
 import org.apache.kafka.storage.internals.log.FetchIsolation;
 import org.apache.kafka.storage.internals.log.FetchParams;
 import org.apache.kafka.storage.internals.log.FetchPartitionData;
@@ -90,11 +92,11 @@ public class ShareFetchUtilsTest {
         doNothing().when(sp1).updateCacheAndOffsets(any(Long.class));
         doNothing().when(sp0).updateCacheAndOffsets(any(Long.class));
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new ConcurrentHashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
ConcurrentHashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
 
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData = new ShareFetchData(
                 new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
                         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, memberId,
                 new CompletableFuture<>(), partitionMaxBytes);
@@ -119,7 +121,7 @@ public class ShareFetchUtilsTest {
                 records1, Optional.empty(), OptionalLong.empty(), 
Optional.empty(),
                 OptionalInt.empty(), false));
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> result =
-                ShareFetchUtils.processFetchResponse(shareFetchPartitionData, 
responseData, partitionCacheMap, mock(ReplicaManager.class));
+                ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData, partitionCacheMap, mock(ReplicaManager.class));
 
         assertTrue(result.isDone());
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData 
= result.join();
@@ -161,11 +163,11 @@ public class ShareFetchUtilsTest {
         doNothing().when(sp1).updateCacheAndOffsets(any(Long.class));
         doNothing().when(sp0).updateCacheAndOffsets(any(Long.class));
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new ConcurrentHashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
ConcurrentHashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
 
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData = new ShareFetchData(
                 new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
                         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, memberId,
                 new CompletableFuture<>(), partitionMaxBytes);
@@ -178,7 +180,7 @@ public class ShareFetchUtilsTest {
                 MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), 
Optional.empty(),
                 OptionalInt.empty(), false));
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> result =
-                ShareFetchUtils.processFetchResponse(shareFetchPartitionData, 
responseData, partitionCacheMap, mock(ReplicaManager.class));
+                ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData, partitionCacheMap, mock(ReplicaManager.class));
 
         assertTrue(result.isDone());
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData 
= result.join();
@@ -207,11 +209,11 @@ public class ShareFetchUtilsTest {
         SharePartition sp0 = Mockito.mock(SharePartition.class);
         SharePartition sp1 = Mockito.mock(SharePartition.class);
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new ConcurrentHashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
ConcurrentHashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
 
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData = new ShareFetchData(
                 new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
                         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()),
                 groupId, Uuid.randomUuid().toString(), new 
CompletableFuture<>(), partitionMaxBytes);
@@ -251,7 +253,7 @@ public class ShareFetchUtilsTest {
                 records1, Optional.empty(), OptionalLong.empty(), 
Optional.empty(),
                 OptionalInt.empty(), false));
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> result1 =
-                ShareFetchUtils.processFetchResponse(shareFetchPartitionData, 
responseData1, partitionCacheMap, replicaManager);
+                ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData1, partitionCacheMap, replicaManager);
 
         assertTrue(result1.isDone());
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
resultData1 = result1.join();
@@ -282,7 +284,7 @@ public class ShareFetchUtilsTest {
                 MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), 
Optional.empty(),
                 OptionalInt.empty(), false));
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> result2 =
-                ShareFetchUtils.processFetchResponse(shareFetchPartitionData, 
responseData2, partitionCacheMap, replicaManager);
+                ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData2, partitionCacheMap, replicaManager);
 
         assertTrue(result2.isDone());
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
resultData2 = result2.join();
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 861233cda9c..9529c678301 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -53,10 +53,12 @@ import org.apache.kafka.server.share.CachedSharePartition;
 import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
 import org.apache.kafka.server.share.NoOpShareStatePersister;
 import org.apache.kafka.server.share.Persister;
-import org.apache.kafka.server.share.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.context.FinalContext;
 import org.apache.kafka.server.share.context.ShareFetchContext;
 import org.apache.kafka.server.share.context.ShareSessionContext;
+import org.apache.kafka.server.share.fetch.ShareFetchData;
 import org.apache.kafka.server.share.session.ShareSession;
 import org.apache.kafka.server.share.session.ShareSessionCache;
 import org.apache.kafka.server.share.session.ShareSessionKey;
@@ -978,17 +980,17 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testSharePartitionKey() {
-        SharePartitionManager.SharePartitionKey sharePartitionKey1 = new 
SharePartitionManager.SharePartitionKey("mock-group-1",
+        SharePartitionKey sharePartitionKey1 = new 
SharePartitionKey("mock-group-1",
                 new TopicIdPartition(new Uuid(0L, 1L), new 
TopicPartition("test", 0)));
-        SharePartitionManager.SharePartitionKey sharePartitionKey2 = new 
SharePartitionManager.SharePartitionKey("mock-group-2",
+        SharePartitionKey sharePartitionKey2 = new 
SharePartitionKey("mock-group-2",
                 new TopicIdPartition(new Uuid(0L, 1L), new 
TopicPartition("test", 0)));
-        SharePartitionManager.SharePartitionKey sharePartitionKey3 = new 
SharePartitionManager.SharePartitionKey("mock-group-1",
+        SharePartitionKey sharePartitionKey3 = new 
SharePartitionKey("mock-group-1",
                 new TopicIdPartition(new Uuid(1L, 1L), new 
TopicPartition("test-1", 0)));
-        SharePartitionManager.SharePartitionKey sharePartitionKey4 = new 
SharePartitionManager.SharePartitionKey("mock-group-1",
+        SharePartitionKey sharePartitionKey4 = new 
SharePartitionKey("mock-group-1",
                 new TopicIdPartition(new Uuid(0L, 1L), new 
TopicPartition("test", 1)));
-        SharePartitionManager.SharePartitionKey sharePartitionKey5 = new 
SharePartitionManager.SharePartitionKey("mock-group-1",
+        SharePartitionKey sharePartitionKey5 = new 
SharePartitionKey("mock-group-1",
                 new TopicIdPartition(new Uuid(0L, 0L), new 
TopicPartition("test-2", 0)));
-        SharePartitionManager.SharePartitionKey sharePartitionKey1Copy = new 
SharePartitionManager.SharePartitionKey("mock-group-1",
+        SharePartitionKey sharePartitionKey1Copy = new 
SharePartitionKey("mock-group-1",
                 new TopicIdPartition(new Uuid(0L, 1L), new 
TopicPartition("test", 0)));
 
         assertEquals(sharePartitionKey1, sharePartitionKey1Copy);
@@ -1182,8 +1184,8 @@ public class SharePartitionManagerTest {
         when(sp0.maybeAcquireFetchLock()).thenReturn(true);
         when(sp0.canAcquireRecords()).thenReturn(false);
         
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
                 "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
@@ -1305,9 +1307,9 @@ public class SharePartitionManagerTest {
         partitionMap.add(new CachedSharePartition(tp3));
         when(shareSession.partitionMap()).thenReturn(partitionMap);
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
 
         SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
             .withCache(cache)
@@ -1439,8 +1441,8 @@ public class SharePartitionManagerTest {
 
         when(sp.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(CompletableFuture.completedFuture(null));
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp), sp);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
         SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
             .withPartitionCacheMap(partitionCacheMap).build();
 
@@ -1475,10 +1477,10 @@ public class SharePartitionManagerTest {
         when(sp2.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(CompletableFuture.completedFuture(null));
         when(sp3.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(CompletableFuture.completedFuture(null));
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp3), sp3);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
 
         Metrics metrics = new Metrics();
         SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
@@ -1565,8 +1567,8 @@ public class SharePartitionManagerTest {
         TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
         SharePartition sp = mock(SharePartition.class);
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp), sp);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
         SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
                 .withPartitionCacheMap(partitionCacheMap).build();
 
@@ -1594,8 +1596,8 @@ public class SharePartitionManagerTest {
         when(sp.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(FutureUtils.failedFuture(
                 new InvalidRequestException("Member is not the owner of batch 
record")
         ));
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp), sp);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
         SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
                 .withPartitionCacheMap(partitionCacheMap).build();
 
@@ -1649,16 +1651,16 @@ public class SharePartitionManagerTest {
         final Time time = new MockTime();
         ReplicaManager replicaManager = mock(ReplicaManager.class);
 
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData1 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData1 = new ShareFetchData(
                 fetchParams, groupId, memberId, new CompletableFuture<>(), 
Collections.emptyMap());
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData2 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData2 = new ShareFetchData(
             fetchParams, groupId, memberId, new CompletableFuture<>(), 
partitionMaxBytes);
 
-        ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData> 
fetchQueue = new ConcurrentLinkedQueue<>();
+        ConcurrentLinkedQueue<ShareFetchData> fetchQueue = new 
ConcurrentLinkedQueue<>();
         // First request added to fetch queue is empty i.e. no topic 
partitions to fetch.
-        fetchQueue.add(shareFetchPartitionData1);
+        fetchQueue.add(shareFetchData1);
         // Second request added to fetch queue has a topic partition to fetch.
-        fetchQueue.add(shareFetchPartitionData2);
+        fetchQueue.add(shareFetchData2);
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
                 "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
@@ -1704,11 +1706,11 @@ public class SharePartitionManagerTest {
             return CompletableFuture.completedFuture(Optional.empty());
         }).when(sp2).acknowledge(ArgumentMatchers.eq(memberId), any());
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
 
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData = new ShareFetchData(
                 new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
                         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()),
                 groupId,
@@ -1731,7 +1733,7 @@ public class SharePartitionManagerTest {
         partitionMaxBytes.keySet().forEach(topicIdPartition -> 
delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, 
topicIdPartition)));
 
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
-                .withShareFetchPartitionData(shareFetchPartitionData)
+                .withShareFetchData(shareFetchData)
                 .withReplicaManager(replicaManager)
                 .withPartitionCacheMap(partitionCacheMap)
                 .build();
@@ -1799,12 +1801,12 @@ public class SharePartitionManagerTest {
             return CompletableFuture.completedFuture(Optional.empty());
         }).when(sp3).acknowledge(ArgumentMatchers.eq(memberId), any());
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp3), sp3);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
 
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData = new ShareFetchData(
                 new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
                         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()),
                 groupId,
@@ -1829,7 +1831,7 @@ public class SharePartitionManagerTest {
         partitionMaxBytes.keySet().forEach(topicIdPartition -> 
delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, 
topicIdPartition)));
 
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
-                .withShareFetchPartitionData(shareFetchPartitionData)
+                .withShareFetchData(shareFetchData)
                 .withReplicaManager(replicaManager)
                 .withPartitionCacheMap(partitionCacheMap)
                 .build();
@@ -1893,11 +1895,11 @@ public class SharePartitionManagerTest {
             return CompletableFuture.completedFuture(Optional.empty());
         }).when(sp2).releaseAcquiredRecords(ArgumentMatchers.eq(memberId));
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
 
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData = new ShareFetchData(
                 new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
                         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()),
                 groupId,
@@ -1920,7 +1922,7 @@ public class SharePartitionManagerTest {
         partitionMaxBytes.keySet().forEach(topicIdPartition -> 
delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, 
topicIdPartition)));
 
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
-                .withShareFetchPartitionData(shareFetchPartitionData)
+                .withShareFetchData(shareFetchData)
                 .withReplicaManager(replicaManager)
                 .withPartitionCacheMap(partitionCacheMap)
                 .build();
@@ -1992,12 +1994,12 @@ public class SharePartitionManagerTest {
             return CompletableFuture.completedFuture(Optional.empty());
         }).when(sp3).releaseAcquiredRecords(ArgumentMatchers.eq(memberId));
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp3), sp3);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
 
-        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= new SharePartitionManager.ShareFetchPartitionData(
+        ShareFetchData shareFetchData = new ShareFetchData(
                 new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
                         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()),
                 groupId,
@@ -2022,7 +2024,7 @@ public class SharePartitionManagerTest {
         partitionMaxBytes.keySet().forEach(topicIdPartition -> 
delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, 
topicIdPartition)));
 
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
-                .withShareFetchPartitionData(shareFetchPartitionData)
+                .withShareFetchData(shareFetchData)
                 .withReplicaManager(replicaManager)
                 .withPartitionCacheMap(partitionCacheMap)
                 .build();
@@ -2067,8 +2069,8 @@ public class SharePartitionManagerTest {
         Map<TopicIdPartition, Integer> partitionMaxBytes = 
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
 
         SharePartition sp0 = mock(SharePartition.class);
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
 
         // Keep the initialization future pending, so fetch request is stuck.
         CompletableFuture<Void> pendingInitializationFuture = new 
CompletableFuture<>();
@@ -2111,8 +2113,8 @@ public class SharePartitionManagerTest {
         Map<TopicIdPartition, Integer> partitionMaxBytes = 
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
 
         SharePartition sp0 = mock(SharePartition.class);
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
 
         ReplicaManager replicaManager = mock(ReplicaManager.class);
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
@@ -2181,7 +2183,7 @@ public class SharePartitionManagerTest {
         assertTrue(partitionCacheMap.isEmpty());
 
         // The last exception removes the share partition from the cache hence 
re-add the share partition to cache.
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
         // Return NotLeaderOrFollowerException to simulate initialization 
failure.
         when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new 
NotLeaderOrFollowerException("Not leader or follower")));
         future = sharePartitionManager.fetchMessages(groupId, 
memberId.toString(), fetchParams, partitionMaxBytes);
@@ -2195,7 +2197,7 @@ public class SharePartitionManagerTest {
         assertTrue(partitionCacheMap.isEmpty());
 
         // The last exception removes the share partition from the cache hence 
re-add the share partition to cache.
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
         // Return RuntimeException to simulate initialization failure.
         when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new 
RuntimeException("Runtime exception")));
         future = sharePartitionManager.fetchMessages(groupId, 
memberId.toString(), fetchParams, partitionMaxBytes);
@@ -2289,11 +2291,11 @@ public class SharePartitionManagerTest {
         private ReplicaManager replicaManager = mock(ReplicaManager.class);
         private Time time = new MockTime();
         private ShareSessionCache cache = new ShareSessionCache(10, 1000);
-        private Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
+        private Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
         private Persister persister = NoOpShareStatePersister.getInstance();
         private Timer timer = new MockTimer();
         private Metrics metrics = new Metrics();
-        private 
ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData> fetchQueue 
= new ConcurrentLinkedQueue<>();
+        private ConcurrentLinkedQueue<ShareFetchData> fetchQueue = new 
ConcurrentLinkedQueue<>();
         private DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class);
 
         private SharePartitionManagerBuilder withReplicaManager(ReplicaManager 
replicaManager) {
@@ -2311,7 +2313,7 @@ public class SharePartitionManagerTest {
             return this;
         }
 
-        private SharePartitionManagerBuilder 
withPartitionCacheMap(Map<SharePartitionManager.SharePartitionKey, 
SharePartition> partitionCacheMap) {
+        private SharePartitionManagerBuilder 
withPartitionCacheMap(Map<SharePartitionKey, SharePartition> partitionCacheMap) 
{
             this.partitionCacheMap = partitionCacheMap;
             return this;
         }
@@ -2331,7 +2333,7 @@ public class SharePartitionManagerTest {
             return this;
         }
 
-        private SharePartitionManagerBuilder 
withFetchQueue(ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData>
 fetchQueue) {
+        private SharePartitionManagerBuilder 
withFetchQueue(ConcurrentLinkedQueue<ShareFetchData> fetchQueue) {
             this.fetchQueue = fetchQueue;
             return this;
         }
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 4d821db3dc6..14ad6d5deb9 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -42,9 +42,9 @@ import org.apache.kafka.server.share.PartitionFactory;
 import org.apache.kafka.server.share.Persister;
 import org.apache.kafka.server.share.PersisterStateBatch;
 import org.apache.kafka.server.share.ReadShareGroupStateResult;
-import org.apache.kafka.server.share.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.TopicData;
 import org.apache.kafka.server.share.WriteShareGroupStateResult;
+import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.util.FutureUtils;
 import org.apache.kafka.server.util.timer.SystemTimer;
 import org.apache.kafka.server.util.timer.SystemTimerReaper;
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a658b49b567..7972e9c5355 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -88,8 +88,9 @@ import 
org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_I
 import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, 
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal}
 import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, 
ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig}
 import org.apache.kafka.server.metrics.ClientMetricsTestUtils
-import org.apache.kafka.server.share.{CachedSharePartition, 
ErroneousAndValidPartitionData, ShareAcknowledgementBatch}
+import org.apache.kafka.server.share.{CachedSharePartition, 
ErroneousAndValidPartitionData}
 import org.apache.kafka.server.quota.ThrottleCallback
+import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
 import org.apache.kafka.server.share.context.{FinalContext, 
ShareSessionContext}
 import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
 import org.apache.kafka.server.util.{FutureUtils, MockTime}
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java 
b/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
index 6e56edbf3bb..8a2d7eb4f46 100644
--- a/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
+++ b/share/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
@@ -17,36 +17,48 @@
 package org.apache.kafka.server.share;
 
 import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 
 import java.util.Objects;
 
 /**
- * Common immutable share partition key class. This class is
- * placed in server-common so that it can be freely used across
- * various modules.
+ * The SharePartitionKey is used to uniquely identify a share partition. The 
key is made up of the
+ * share group id, the topic id and the partition id. The key is used to store 
the SharePartition
+ * objects in the partition cache map.
  */
 public class SharePartitionKey {
-    private final String groupId;
-    private final Uuid topicId;
-    private final int partition;
+
+    protected final String groupId;
+    protected final TopicIdPartition topicIdPartition;
+
+    public SharePartitionKey(String groupId, TopicIdPartition 
topicIdPartition) {
+        this.groupId = Objects.requireNonNull(groupId);
+        this.topicIdPartition = Objects.requireNonNull(topicIdPartition);
+    }
 
     private SharePartitionKey(String groupId, Uuid topicId, int partition) {
-        this.groupId = groupId;
-        this.topicId = topicId;
-        this.partition = partition;
+        this(groupId, topicId, null, partition);
+    }
+
+    private SharePartitionKey(String groupId, Uuid topicId, String topic, int 
partition) {
+        this(groupId, new TopicIdPartition(Objects.requireNonNull(topicId), 
new TopicPartition(topic, partition)));
     }
 
     public String groupId() {
         return groupId;
     }
 
+    public TopicIdPartition topicIdPartition() {
+        return topicIdPartition;
+    }
+
     public Uuid topicId() {
-        return topicId;
+        return topicIdPartition.topicId();
     }
 
     public int partition() {
-        return partition;
+        return topicIdPartition.partition();
     }
 
     public static SharePartitionKey getInstance(String groupId, 
TopicIdPartition topicIdPartition) {
@@ -57,33 +69,28 @@ public class SharePartitionKey {
         return new SharePartitionKey(groupId, topicId, partition);
     }
 
-    public String asCoordinatorKey() {
-        return asCoordinatorKey(groupId, topicId, partition);
-    }
-
-    public static String asCoordinatorKey(String groupId, Uuid topicId, int 
partition) {
-        return String.format("%s:%s:%d", groupId, topicId, partition);
-    }
-
     @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (!(o instanceof SharePartitionKey)) return false;
-        SharePartitionKey that = (SharePartitionKey) o;
-        return partition == that.partition && Objects.equals(groupId, 
that.groupId) && Objects.equals(topicId, that.topicId);
+    public boolean equals(final Object obj) {
+        if (this == obj)
+            return true;
+        else if (obj == null || getClass() != obj.getClass())
+            return false;
+        else {
+            SharePartitionKey that = (SharePartitionKey) obj;
+            return groupId.equals(that.groupId) && 
Objects.equals(topicIdPartition, that.topicIdPartition);
+        }
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(groupId, topicId, partition);
+        return Objects.hash(groupId, topicIdPartition);
     }
 
     @Override
     public String toString() {
         return "SharePartitionKey{" +
-            "groupId=" + groupId +
-            ",topicId=" + topicId +
-            ",partition=" + partition +
-            "}";
+            "groupId='" + groupId +
+            ", topicIdPartition=" + topicIdPartition +
+            '}';
     }
 }
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java
 
b/share/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
similarity index 97%
rename from 
share/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java
rename to 
share/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
index 5aacbee0322..b23104d6dab 100644
--- 
a/share/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java
+++ 
b/share/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.acknowledge;
 
 import java.util.List;
 
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java 
b/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java
new file mode 100644
index 00000000000..667c05df047
--- /dev/null
+++ 
b/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.fetch;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
+import org.apache.kafka.storage.internals.log.FetchParams;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The ShareFetchData class is used to store the fetch parameters for a share 
fetch request.
+ */
+public class ShareFetchData {
+
+    private final FetchParams fetchParams;
+    private final String groupId;
+    private final String memberId;
+    private final CompletableFuture<Map<TopicIdPartition, PartitionData>> 
future;
+    private final Map<TopicIdPartition, Integer> partitionMaxBytes;
+
+    public ShareFetchData(
+        FetchParams fetchParams,
+        String groupId,
+        String memberId,
+        CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
+        Map<TopicIdPartition, Integer> partitionMaxBytes
+    ) {
+        this.fetchParams = fetchParams;
+        this.groupId = groupId;
+        this.memberId = memberId;
+        this.future = future;
+        this.partitionMaxBytes = partitionMaxBytes;
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public String memberId() {
+        return memberId;
+    }
+
+    public CompletableFuture<Map<TopicIdPartition, PartitionData>> future() {
+        return future;
+    }
+
+    public Map<TopicIdPartition, Integer> partitionMaxBytes() {
+        return partitionMaxBytes;
+    }
+
+    public FetchParams fetchParams() {
+        return fetchParams;
+    }
+}

Reply via email to