This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4f28973bd19 KAFKA-18827: Initialize share state, share coordinator 
impl. [1/N] (#18968)
4f28973bd19 is described below

commit 4f28973bd19cb94986415765f55cf5ab5bfae3e6
Author: Sushant Mahajan <[email protected]>
AuthorDate: Sat Feb 22 21:42:08 2025 +0530

    KAFKA-18827: Initialize share state, share coordinator impl. [1/N] (#18968)
    
    In this PR, we have added the share coordinator and KafkaApis side impl
    of the intialize share group state RPC.
    ref:
    
https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka#KIP932:QueuesforKafka-InitializeShareGroupStateAPI
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../requests/InitializeShareGroupStateRequest.java |  22 +-
 .../InitializeShareGroupStateResponse.java         |  70 +++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  30 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 121 +++++++
 .../kafka/coordinator/share/ShareCoordinator.java  |  10 +
 .../coordinator/share/ShareCoordinatorService.java | 146 ++++++++-
 .../coordinator/share/ShareCoordinatorShard.java   | 205 ++++++++----
 .../kafka/coordinator/share/ShareGroupOffset.java  |  21 +-
 .../share/ShareCoordinatorServiceTest.java         | 361 +++++++++++++++++----
 .../share/ShareCoordinatorShardTest.java           | 181 +++++++++++
 10 files changed, 1007 insertions(+), 160 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java
index fc9abc71613..15d50acaa37 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java
@@ -34,7 +34,7 @@ public class InitializeShareGroupStateRequest extends 
AbstractRequest {
         private final InitializeShareGroupStateRequestData data;
 
         public Builder(InitializeShareGroupStateRequestData data) {
-            this(data, false);
+            this(data, true);
         }
 
         public Builder(InitializeShareGroupStateRequestData data, boolean 
enableUnstableLastVersion) {
@@ -64,15 +64,15 @@ public class InitializeShareGroupStateRequest extends 
AbstractRequest {
     public InitializeShareGroupStateResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
         List<InitializeShareGroupStateResponseData.InitializeStateResult> 
results = new ArrayList<>();
         data.topics().forEach(
-                topicResult -> results.add(new 
InitializeShareGroupStateResponseData.InitializeStateResult()
-                        .setTopicId(topicResult.topicId())
-                        .setPartitions(topicResult.partitions().stream()
-                                .map(partitionData -> new 
InitializeShareGroupStateResponseData.PartitionResult()
-                                        
.setPartition(partitionData.partition())
-                                        
.setErrorCode(Errors.forException(e).code()))
-                                .collect(Collectors.toList()))));
+            topicResult -> results.add(new 
InitializeShareGroupStateResponseData.InitializeStateResult()
+                .setTopicId(topicResult.topicId())
+                .setPartitions(topicResult.partitions().stream()
+                    .map(partitionData -> new 
InitializeShareGroupStateResponseData.PartitionResult()
+                        .setPartition(partitionData.partition())
+                        .setErrorCode(Errors.forException(e).code()))
+                    .collect(Collectors.toList()))));
         return new InitializeShareGroupStateResponse(new 
InitializeShareGroupStateResponseData()
-                .setResults(results));
+            .setResults(results));
     }
 
     @Override
@@ -82,8 +82,8 @@ public class InitializeShareGroupStateRequest extends 
AbstractRequest {
 
     public static InitializeShareGroupStateRequest parse(ByteBuffer buffer, 
short version) {
         return new InitializeShareGroupStateRequest(
-                new InitializeShareGroupStateRequestData(new 
ByteBufferAccessor(buffer), version),
-                version
+            new InitializeShareGroupStateRequestData(new 
ByteBufferAccessor(buffer), version),
+            version
         );
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java
index 44880c2cb86..be88f0944d0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java
@@ -17,13 +17,17 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
 import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class InitializeShareGroupStateResponse extends AbstractResponse {
@@ -43,9 +47,9 @@ public class InitializeShareGroupStateResponse extends 
AbstractResponse {
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> counts = new HashMap<>();
         data.results().forEach(
-                result -> result.partitions().forEach(
-                        partitionResult -> updateErrorCounts(counts, 
Errors.forCode(partitionResult.errorCode()))
-                )
+            result -> result.partitions().forEach(
+                partitionResult -> updateErrorCounts(counts, 
Errors.forCode(partitionResult.errorCode()))
+            )
         );
         return counts;
     }
@@ -62,7 +66,65 @@ public class InitializeShareGroupStateResponse extends 
AbstractResponse {
 
     public static InitializeShareGroupStateResponse parse(ByteBuffer buffer, 
short version) {
         return new InitializeShareGroupStateResponse(
-                new InitializeShareGroupStateResponseData(new 
ByteBufferAccessor(buffer), version)
+            new InitializeShareGroupStateResponseData(new 
ByteBufferAccessor(buffer), version)
         );
     }
+
+    public static InitializeShareGroupStateResponseData 
toGlobalErrorResponse(InitializeShareGroupStateRequestData request, Errors 
error) {
+        List<InitializeShareGroupStateResponseData.InitializeStateResult> 
initStateResults = new ArrayList<>();
+        request.topics().forEach(topicData -> {
+            List<InitializeShareGroupStateResponseData.PartitionResult> 
partitionResults = new ArrayList<>();
+            topicData.partitions().forEach(partitionData -> 
partitionResults.add(
+                toErrorResponsePartitionResult(partitionData.partition(), 
error, error.message()))
+            );
+            
initStateResults.add(toResponseInitializeStateResult(topicData.topicId(), 
partitionResults));
+        });
+        return new 
InitializeShareGroupStateResponseData().setResults(initStateResults);
+    }
+
+    public static InitializeShareGroupStateResponseData.PartitionResult 
toErrorResponsePartitionResult(
+        int partitionId,
+        Errors error,
+        String errorMessage
+    ) {
+        return new InitializeShareGroupStateResponseData.PartitionResult()
+            .setPartition(partitionId)
+            .setErrorCode(error.code())
+            .setErrorMessage(errorMessage);
+    }
+
+    public static InitializeShareGroupStateResponseData.InitializeStateResult 
toResponseInitializeStateResult(
+        Uuid topicId,
+        List<InitializeShareGroupStateResponseData.PartitionResult> 
partitionResults
+    ) {
+        return new 
InitializeShareGroupStateResponseData.InitializeStateResult()
+            .setTopicId(topicId)
+            .setPartitions(partitionResults);
+    }
+
+    public static InitializeShareGroupStateResponseData 
toErrorResponseData(Uuid topicId, int partitionId, Errors error, String 
errorMessage) {
+        return new InitializeShareGroupStateResponseData().setResults(List.of(
+            new InitializeShareGroupStateResponseData.InitializeStateResult()
+                .setTopicId(topicId)
+                .setPartitions(List.of(new 
InitializeShareGroupStateResponseData.PartitionResult()
+                    .setPartition(partitionId)
+                    .setErrorCode(error.code())
+                    .setErrorMessage(errorMessage)))
+        ));
+    }
+
+    public static InitializeShareGroupStateResponseData.PartitionResult 
toResponsePartitionResult(int partitionId) {
+        return new 
InitializeShareGroupStateResponseData.PartitionResult().setPartition(partitionId);
+    }
+
+    public static InitializeShareGroupStateResponseData toResponseData(Uuid 
topicId, int partitionId) {
+        return new InitializeShareGroupStateResponseData().setResults(List.of(
+            new InitializeShareGroupStateResponseData.InitializeStateResult()
+                .setTopicId(topicId)
+                .setPartitions(List.of(
+                    new InitializeShareGroupStateResponseData.PartitionResult()
+                        .setPartition(partitionId)
+                ))
+        ));
+    }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7adf9c7c3cb..aa1f392506e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3100,11 +3100,33 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
   }
 
-  def handleInitializeShareGroupStateRequest(request: RequestChannel.Request): 
Unit = {
+  def handleInitializeShareGroupStateRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val initializeShareGroupStateRequest = 
request.body[InitializeShareGroupStateRequest]
-    // TODO: Implement the InitializeShareGroupStateRequest handling
-    requestHelper.sendMaybeThrottle(request, 
initializeShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!authorizeClusterOperation(request, CLUSTER_ACTION)) {
+      requestHelper.sendMaybeThrottle(request, new 
InitializeShareGroupStateResponse(
+        InitializeShareGroupStateResponse.toGlobalErrorResponse(
+          initializeShareGroupStateRequest.data(),
+          Errors.CLUSTER_AUTHORIZATION_FAILED
+        )))
+      return CompletableFuture.completedFuture[Unit](())
+    }
+
+    shareCoordinator match {
+      case None => requestHelper.sendResponseMaybeThrottle(request, 
requestThrottleMs =>
+        initializeShareGroupStateRequest.getErrorResponse(requestThrottleMs,
+          new ApiException("Share coordinator is not enabled.")))
+        CompletableFuture.completedFuture[Unit](())
+
+      case Some(coordinator) => coordinator.initializeState(request.context, 
initializeShareGroupStateRequest.data)
+        .handle[Unit] { (response, exception) =>
+          if (exception != null) {
+            requestHelper.sendMaybeThrottle(request, 
initializeShareGroupStateRequest.getErrorResponse(exception))
+          } else {
+            requestHelper.sendMaybeThrottle(request, new 
InitializeShareGroupStateResponse(response))
+          }
+        }
+    }
   }
 
   def handleReadShareGroupStateRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 0670bf0b36d..5f30cfd83e3 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -10879,6 +10879,98 @@ class KafkaApisTest extends Logging {
     })
   }
 
+  @Test
+  def testInitializeShareGroupStateSuccess(): Unit = {
+    val topicId = Uuid.randomUuid();
+    val initRequestData = new InitializeShareGroupStateRequestData()
+      .setGroupId("group1")
+      .setTopics(List(
+        new InitializeShareGroupStateRequestData.InitializeStateData()
+          .setTopicId(topicId)
+          .setPartitions(List(
+            new InitializeShareGroupStateRequestData.PartitionData()
+              .setPartition(1)
+              .setStateEpoch(0)
+          ).asJava)
+      ).asJava)
+
+    val initStateResultData: 
util.List[InitializeShareGroupStateResponseData.InitializeStateResult] = List(
+      new InitializeShareGroupStateResponseData.InitializeStateResult()
+        .setTopicId(topicId)
+        .setPartitions(List(
+          new InitializeShareGroupStateResponseData.PartitionResult()
+            .setPartition(1)
+            .setErrorCode(Errors.NONE.code())
+            .setErrorMessage(null)
+        ).asJava)
+    ).asJava
+
+    val config = Map(
+      ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
+    )
+
+    val response = getInitializeShareGroupResponse(
+      initRequestData,
+      config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
+      verifyNoErr = true,
+      null,
+      initStateResultData
+    )
+
+    assertNotNull(response.data)
+    assertEquals(1, response.data.results.size)
+  }
+
+  @Test
+  def testInitializeShareGroupStateAuthorizationFailed(): Unit = {
+    val topicId = Uuid.randomUuid();
+    val initRequestData = new InitializeShareGroupStateRequestData()
+      .setGroupId("group1")
+      .setTopics(List(
+        new InitializeShareGroupStateRequestData.InitializeStateData()
+          .setTopicId(topicId)
+          .setPartitions(List(
+            new InitializeShareGroupStateRequestData.PartitionData()
+              .setPartition(1)
+              .setStateEpoch(0)
+          ).asJava)
+      ).asJava)
+
+    val initStateResultData: 
util.List[InitializeShareGroupStateResponseData.InitializeStateResult] = List(
+      new InitializeShareGroupStateResponseData.InitializeStateResult()
+        .setTopicId(topicId)
+        .setPartitions(List(
+          new InitializeShareGroupStateResponseData.PartitionResult()
+            .setPartition(1)
+            .setErrorCode(Errors.NONE.code())
+            .setErrorMessage(null)
+        ).asJava)
+    ).asJava
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava, 
Seq(AuthorizationResult.ALLOWED).asJava)
+
+    val config = Map(
+      ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
+    )
+
+    val response = getInitializeShareGroupResponse(
+      initRequestData,
+      config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
+      verifyNoErr = false,
+      authorizer,
+      initStateResultData
+    )
+
+    assertNotNull(response.data)
+    assertEquals(1, response.data.results.size)
+    response.data.results.forEach(deleteResult => {
+      assertEquals(1, deleteResult.partitions.size)
+      assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED.code(), 
deleteResult.partitions.get(0).errorCode())
+    })
+  }
+
   def getShareGroupDescribeResponse(groupIds: util.List[String], 
configOverrides: Map[String, String] = Map.empty,
                                     verifyNoErr: Boolean = true, authorizer: 
Authorizer = null,
                                     describedGroups: 
util.List[ShareGroupDescribeResponseData.DescribedGroup]): 
ShareGroupDescribeResponse = {
@@ -11024,4 +11116,33 @@ class KafkaApisTest extends Logging {
     }
     response
   }
+
+  def getInitializeShareGroupResponse(requestData: 
InitializeShareGroupStateRequestData, configOverrides: Map[String, String] = 
Map.empty,
+                                      verifyNoErr: Boolean = true, authorizer: 
Authorizer = null,
+                                      initStateResult: 
util.List[InitializeShareGroupStateResponseData.InitializeStateResult]): 
InitializeShareGroupStateResponse = {
+    val requestChannelRequest = buildRequest(new 
InitializeShareGroupStateRequest.Builder(requestData, true).build())
+
+    val future = new CompletableFuture[InitializeShareGroupStateResponseData]()
+    when(shareCoordinator.initializeState(
+      any[RequestContext],
+      any[InitializeShareGroupStateRequestData]
+    )).thenReturn(future)
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    kafkaApis = createKafkaApis(
+      overrideProperties = configOverrides,
+      authorizer = Option(authorizer),
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching())
+
+    future.complete(new InitializeShareGroupStateResponseData()
+      .setResults(initStateResult))
+
+    val response = 
verifyNoThrottling[InitializeShareGroupStateResponse](requestChannelRequest)
+    if (verifyNoErr) {
+      val expectedInitShareGroupStateResponseData = new 
InitializeShareGroupStateResponseData()
+        .setResults(initStateResult)
+      assertEquals(expectedInitShareGroupStateResponseData, response.data)
+    }
+    response
+  }
 }
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
index 61e96e37f07..f04cb1385f0 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
@@ -19,6 +19,8 @@ package org.apache.kafka.coordinator.share;
 
 import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
+import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
 import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -96,6 +98,14 @@ public interface ShareCoordinator {
      */
     CompletableFuture<DeleteShareGroupStateResponseData> 
deleteState(RequestContext context, DeleteShareGroupStateRequestData request);
 
+    /**
+     * Handle initialize share group state call
+     * @param context - represents the incoming initialize share group request 
context
+     * @param request - actual RPC request object
+     * @return completable future representing initialize share group RPC 
response data
+     */
+    CompletableFuture<InitializeShareGroupStateResponseData> 
initializeState(RequestContext context, InitializeShareGroupStateRequestData 
request);
+
     /**
      * Called when new coordinator is elected
      * @param partitionIndex - The partition index (internal topic)
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 87071b3f97d..764e008136e 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
+import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
 import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -32,6 +34,7 @@ import 
org.apache.kafka.common.message.WriteShareGroupStateRequestData;
 import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
+import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
 import org.apache.kafka.common.requests.RequestContext;
@@ -70,10 +73,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.IntSupplier;
-import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException;
 
+@SuppressWarnings("ClassDataAbstractionCoupling")
 public class ShareCoordinatorService implements ShareCoordinator {
     private final ShareCoordinatorConfig config;
     private final Logger log;
@@ -682,11 +685,11 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
                 (topicId, topicEntry) -> {
                     
List<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionResults = 
new ArrayList<>(topicEntry.size());
                     topicEntry.forEach(
-                        (partitionId, responseFuture) -> {
+                        (partitionId, responseFut) -> {
                             // ResponseFut would already be completed by now 
since we have used
                             // CompletableFuture::allOf to create a combined 
future from the future map.
                             partitionResults.add(
-                                
responseFuture.getNow(null).results().get(0).partitions().get(0)
+                                
responseFut.getNow(null).results().get(0).partitions().get(0)
                             );
                         }
                     );
@@ -792,11 +795,11 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
                 (topicId, topicEntry) -> {
                     List<DeleteShareGroupStateResponseData.PartitionResult> 
partitionResults = new ArrayList<>(topicEntry.size());
                     topicEntry.forEach(
-                        (partitionId, responseFuture) -> {
+                        (partitionId, responseFut) -> {
                             // ResponseFut would already be completed by now 
since we have used
                             // CompletableFuture::allOf to create a combined 
future from the future map.
                             partitionResults.add(
-                                
responseFuture.getNow(null).results().get(0).partitions().get(0)
+                                
responseFut.getNow(null).results().get(0).partitions().get(0)
                             );
                         }
                     );
@@ -808,6 +811,106 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         });
     }
 
+    @Override
+    public CompletableFuture<InitializeShareGroupStateResponseData> 
initializeState(RequestContext context, InitializeShareGroupStateRequestData 
request) {
+        // Send an empty response if the coordinator is not active.
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(
+                generateErrorInitStateResponse(
+                    request,
+                    Errors.COORDINATOR_NOT_AVAILABLE,
+                    "Share coordinator is not available."
+                )
+            );
+        }
+
+        String groupId = request.groupId();
+        // Send an empty response if groupId is invalid.
+        if (isGroupIdEmpty(groupId)) {
+            log.error("Group id must be specified and non-empty: {}", request);
+            return CompletableFuture.completedFuture(
+                new InitializeShareGroupStateResponseData()
+            );
+        }
+
+        // Send an empty response if topic data is empty.
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                new InitializeShareGroupStateResponseData()
+            );
+        }
+
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<InitializeShareGroupStateResponseData>>> futureMap = new 
HashMap<>();
+
+        // The request received here could have multiple keys of structure 
group:topic:partition. However,
+        // the initializeState method in ShareCoordinatorShard expects a 
single key in the request. Hence, we will
+        // be looping over the keys below and constructing new 
InitializeShareGroupStateRequestData objects to pass
+        // onto the shard method.
+
+        for (InitializeShareGroupStateRequestData.InitializeStateData 
topicData : request.topics()) {
+            Uuid topicId = topicData.topicId();
+            for (InitializeShareGroupStateRequestData.PartitionData 
partitionData : topicData.partitions()) {
+                SharePartitionKey coordinatorKey = 
SharePartitionKey.getInstance(request.groupId(), topicId, 
partitionData.partition());
+
+                InitializeShareGroupStateRequestData 
requestForCurrentPartition = new InitializeShareGroupStateRequestData()
+                    .setGroupId(groupId)
+                    .setTopics(List.of(new 
InitializeShareGroupStateRequestData.InitializeStateData()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(partitionData))));
+
+                CompletableFuture<InitializeShareGroupStateResponseData> 
initializeFuture = runtime.scheduleWriteOperation(
+                    "initialize-share-group-state",
+                    topicPartitionFor(coordinatorKey),
+                    Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+                    coordinator -> 
coordinator.initializeState(requestForCurrentPartition)
+                ).exceptionally(initializeException ->
+                    handleOperationException(
+                        "initialize-share-group-state",
+                        request,
+                        initializeException,
+                        (error, message) -> 
InitializeShareGroupStateResponse.toErrorResponseData(
+                            topicData.topicId(),
+                            partitionData.partition(),
+                            error,
+                            "Unable to initialize share group state: " + 
initializeException.getMessage()
+                        ),
+                        log
+                    ));
+
+                futureMap.computeIfAbsent(topicId, k -> new HashMap<>())
+                    .put(partitionData.partition(), initializeFuture);
+            }
+        }
+
+        // Combine all futures into a single CompletableFuture<Void>.
+        CompletableFuture<Void> combinedFuture = 
CompletableFuture.allOf(futureMap.values().stream()
+            .flatMap(map -> 
map.values().stream()).toArray(CompletableFuture[]::new));
+
+        // Transform the combined CompletableFuture<Void> into 
CompletableFuture<InitializeShareGroupStateResponseData>.
+        return combinedFuture.thenApply(v -> {
+            List<InitializeShareGroupStateResponseData.InitializeStateResult> 
initializeStateResult = new ArrayList<>(futureMap.size());
+            futureMap.forEach(
+                (topicId, topicEntry) -> {
+                    
List<InitializeShareGroupStateResponseData.PartitionResult> partitionResults = 
new ArrayList<>(topicEntry.size());
+                    topicEntry.forEach(
+                        (partitionId, responseFut) -> {
+                            // ResponseFut would already be completed by now 
since we have used
+                            // CompletableFuture::allOf to create a combined 
future from the future map.
+                            partitionResults.add(
+                                
responseFut.getNow(null).results().get(0).partitions().get(0)
+                            );
+                        }
+                    );
+                    
initializeStateResult.add(InitializeShareGroupStateResponse.toResponseInitializeStateResult(topicId,
 partitionResults));
+                }
+            );
+            return new InitializeShareGroupStateResponseData()
+                .setResults(initializeStateResult);
+        });
+    }
+
     private ReadShareGroupStateResponseData generateErrorReadStateResponse(
         ReadShareGroupStateRequestData request,
         Errors error,
@@ -820,9 +923,9 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
                 resultData.setPartitions(topicData.partitions().stream()
                     .map(partitionData -> 
ReadShareGroupStateResponse.toErrorResponsePartitionResult(
                         partitionData.partition(), error, errorMessage
-                    )).collect(Collectors.toList()));
+                    )).toList());
                 return resultData;
-            }).collect(Collectors.toList()));
+            }).toList());
     }
 
     private ReadShareGroupStateSummaryResponseData 
generateErrorReadStateSummaryResponse(
@@ -837,9 +940,9 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
                 resultData.setPartitions(topicData.partitions().stream()
                     .map(partitionData -> 
ReadShareGroupStateSummaryResponse.toErrorResponsePartitionResult(
                         partitionData.partition(), error, errorMessage
-                    )).collect(Collectors.toList()));
+                    )).toList());
                 return resultData;
-            }).collect(Collectors.toList()));
+            }).toList());
     }
 
     private WriteShareGroupStateResponseData generateErrorWriteStateResponse(
@@ -855,9 +958,9 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
                     resultData.setPartitions(topicData.partitions().stream()
                         .map(partitionData -> 
WriteShareGroupStateResponse.toErrorResponsePartitionResult(
                             partitionData.partition(), error, errorMessage
-                        )).collect(Collectors.toList()));
+                        )).toList());
                     return resultData;
-                }).collect(Collectors.toList()));
+                }).toList());
     }
 
     private DeleteShareGroupStateResponseData generateErrorDeleteStateResponse(
@@ -872,9 +975,26 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
                 resultData.setPartitions(topicData.partitions().stream()
                     .map(partitionData -> 
DeleteShareGroupStateResponse.toErrorResponsePartitionResult(
                         partitionData.partition(), error, errorMessage
-                    )).collect(Collectors.toList()));
+                    )).toList());
+                return resultData;
+            }).toList());
+    }
+
+    private InitializeShareGroupStateResponseData 
generateErrorInitStateResponse(
+        InitializeShareGroupStateRequestData request,
+        Errors error,
+        String errorMessage
+    ) {
+        return new 
InitializeShareGroupStateResponseData().setResults(request.topics().stream()
+            .map(topicData -> {
+                InitializeShareGroupStateResponseData.InitializeStateResult 
resultData = new InitializeShareGroupStateResponseData.InitializeStateResult();
+                resultData.setTopicId(topicData.topicId());
+                resultData.setPartitions(topicData.partitions().stream()
+                    .map(partitionData -> 
InitializeShareGroupStateResponse.toErrorResponsePartitionResult(
+                        partitionData.partition(), error, errorMessage
+                    )).toList());
                 return resultData;
-            }).collect(Collectors.toList()));
+            }).toList());
     }
 
     private static boolean isGroupIdEmpty(String groupId) {
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index a71a911907a..1022a36fb65 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
+import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
 import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -31,6 +33,7 @@ import 
org.apache.kafka.common.message.WriteShareGroupStateResponseData;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
+import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
 import org.apache.kafka.common.requests.TransactionResult;
@@ -66,7 +69,6 @@ import org.slf4j.Logger;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord> {
     private final Logger log;
@@ -317,16 +319,12 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
 
         CoordinatorRecord record = generateShareStateRecord(partitionData, 
key);
         // build successful response if record is correctly created
-        WriteShareGroupStateResponseData responseData = new 
WriteShareGroupStateResponseData()
-            .setResults(
-                Collections.singletonList(
-                    
WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(),
-                        Collections.singletonList(
-                            
WriteShareGroupStateResponse.toResponsePartitionResult(
-                                key.partition()
-                            ))
-                    ))
-            );
+        WriteShareGroupStateResponseData responseData = new 
WriteShareGroupStateResponseData().setResults(
+            
List.of(WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(),
+                List.of(WriteShareGroupStateResponse.toResponsePartitionResult(
+                    key.partition()))
+            ))
+        );
 
         return new CoordinatorResult<>(Collections.singletonList(record), 
responseData);
     }
@@ -346,7 +344,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         // Only one key will be there in the request by design.
         Optional<ReadShareGroupStateResponseData> error = 
maybeGetReadStateError(request);
         if (error.isPresent()) {
-            return new CoordinatorResult<>(Collections.emptyList(), 
error.get());
+            return new CoordinatorResult<>(List.of(), error.get());
         }
 
         ReadShareGroupStateRequestData.ReadStateData topicData = 
request.topics().get(0);
@@ -366,7 +364,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                 partitionId,
                 PartitionFactory.UNINITIALIZED_START_OFFSET,
                 PartitionFactory.DEFAULT_STATE_EPOCH,
-                Collections.emptyList()
+                List.of()
             );
         } else {
             // Leader epoch update might be needed
@@ -379,7 +377,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                             .setLastOffset(stateBatch.lastOffset())
                             .setDeliveryState(stateBatch.deliveryState())
                             .setDeliveryCount(stateBatch.deliveryCount())
-                    ).collect(Collectors.toList()) : Collections.emptyList();
+                    ).toList() : List.of();
 
             responseData = ReadShareGroupStateResponse.toResponseData(
                 topicId,
@@ -393,7 +391,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         // Optimization in case leaderEpoch update is not required.
         if (leaderEpoch == -1 ||
             (leaderEpochMap.get(key) != null && leaderEpochMap.get(key) == 
leaderEpoch)) {
-            return new CoordinatorResult<>(Collections.emptyList(), 
responseData);
+            return new CoordinatorResult<>(List.of(), responseData);
         }
 
         // It is OK to info log this since this reaching this codepoint should 
be quite infrequent.
@@ -406,7 +404,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         WriteShareGroupStateRequestData.PartitionData writePartitionData = new 
WriteShareGroupStateRequestData.PartitionData()
             .setPartition(partitionId)
             .setLeaderEpoch(leaderEpoch)
-            .setStateBatches(Collections.emptyList())
+            .setStateBatches(List.of())
             
.setStartOffset(responseData.results().get(0).partitions().get(0).startOffset())
             
.setStateEpoch(responseData.results().get(0).partitions().get(0).stateEpoch());
 
@@ -471,7 +469,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
             }
         }
 
-        return new CoordinatorResult<>(Collections.emptyList(), responseData);
+        return new CoordinatorResult<>(List.of(), responseData);
     }
 
     /**
@@ -482,7 +480,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
      */
     public CoordinatorResult<Optional<Long>, CoordinatorRecord> 
lastRedundantOffset() {
         return new CoordinatorResult<>(
-            Collections.emptyList(),
+            List.of(),
             this.offsetsManager.lastRedundantOffset()
         );
     }
@@ -494,7 +492,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
      * the request data which covers only key i.e. group1:topic1:partition1. 
The implementation
      * below was done keeping this in mind.
      *
-     * @param request - ReadShareGroupStateSummaryRequestData for a single key
+     * @param request - DeleteShareGroupStateRequestData for a single key
      * @return CoordinatorResult(records, response)
      */
 
@@ -514,22 +512,53 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
 
         CoordinatorRecord record = generateTombstoneRecord(key);
         // build successful response if record is correctly created
-        DeleteShareGroupStateResponseData responseData = new 
DeleteShareGroupStateResponseData()
-            .setResults(
-                List.of(
-                    
DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(),
-                        List.of(
-                            
DeleteShareGroupStateResponse.toResponsePartitionResult(
-                                key.partition()
-                            )
-                        )
-                    )
-                )
-            );
+        DeleteShareGroupStateResponseData responseData = new 
DeleteShareGroupStateResponseData().setResults(
+            
List.of(DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(),
+                
List.of(DeleteShareGroupStateResponse.toResponsePartitionResult(
+                    key.partition()))
+            ))
+        );
 
         return new CoordinatorResult<>(Collections.singletonList(record), 
responseData);
     }
 
+    /**
+     * This method writes a share snapshot records corresponding to the 
requested topic partitions.
+     * <p>
+     * This method as called by the ShareCoordinatorService will be provided 
with
+     * the request data which covers only key i.e. group1:topic1:partition1. 
The implementation
+     * below was done keeping this in mind.
+     *
+     * @param request - InitializeShareGroupStateRequestData for a single key
+     * @return CoordinatorResult(records, response)
+     */
+
+    public CoordinatorResult<InitializeShareGroupStateResponseData, 
CoordinatorRecord> initializeState(
+        InitializeShareGroupStateRequestData request
+    ) {
+        // Records to write (with both key and value of snapshot type), 
response to caller
+        // only one key will be there in the request by design.
+        Optional<CoordinatorResult<InitializeShareGroupStateResponseData, 
CoordinatorRecord>> error = maybeGetInitializeStateError(request);
+        if (error.isPresent()) {
+            return error.get();
+        }
+
+        InitializeShareGroupStateRequestData.InitializeStateData topicData = 
request.topics().get(0);
+        InitializeShareGroupStateRequestData.PartitionData partitionData = 
topicData.partitions().get(0);
+        SharePartitionKey key = 
SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), 
partitionData.partition());
+
+        CoordinatorRecord record = 
generateInitializeStateRecord(partitionData, key);
+        // build successful response if record is correctly created
+        InitializeShareGroupStateResponseData responseData = new 
InitializeShareGroupStateResponseData().setResults(
+            
List.of(InitializeShareGroupStateResponse.toResponseInitializeStateResult(key.topicId(),
+                
List.of(InitializeShareGroupStateResponse.toResponsePartitionResult(
+                    key.partition()))
+            ))
+        );
+
+        return new CoordinatorResult<>(List.of(record), responseData);
+    }
+
     /**
      * Util method to generate a ShareSnapshot or ShareUpdate type record for 
a key, based on various conditions.
      * <p>
@@ -555,7 +584,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                     .setStartOffset(partitionData.startOffset())
                     .setLeaderEpoch(partitionData.leaderEpoch())
                     .setStateEpoch(partitionData.stateEpoch())
-                    .setStateBatches(mergeBatches(Collections.emptyList(), 
partitionData))
+                    .setStateBatches(mergeBatches(List.of(), partitionData))
                     .build());
         } else if (snapshotUpdateCount.getOrDefault(key, 0) >= 
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
             ShareGroupOffset currentState = shareStateMap.get(key); // 
shareStateMap will have the entry as containsKey is true
@@ -587,7 +616,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                     .setSnapshotEpoch(currentState.snapshotEpoch()) // Use 
same snapshotEpoch as last share snapshot.
                     .setStartOffset(partitionData.startOffset())
                     .setLeaderEpoch(partitionData.leaderEpoch())
-                    .setStateBatches(mergeBatches(Collections.emptyList(), 
partitionData))
+                    .setStateBatches(mergeBatches(List.of(), partitionData))
                     .build());
         }
     }
@@ -600,6 +629,24 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         );
     }
 
+    private CoordinatorRecord generateInitializeStateRecord(
+        InitializeShareGroupStateRequestData.PartitionData partitionData,
+        SharePartitionKey key
+    ) {
+        // We need to create a new share snapshot here, with
+        // appropriate state information. We will not be merging
+        // state here with previous snapshots as init state implies
+        // fresh start.
+
+        int snapshotEpoch = shareStateMap.containsKey(key) ? 
shareStateMap.get(key).snapshotEpoch() + 1 : 0;
+        return ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+            key.groupId(),
+            key.topicId(),
+            key.partition(),
+            ShareGroupOffset.fromRequest(partitionData, snapshotEpoch)
+        );
+    }
+
     private List<PersisterStateBatch> mergeBatches(
         List<PersisterStateBatch> soFar,
         WriteShareGroupStateRequestData.PartitionData partitionData) {
@@ -609,15 +656,13 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
     private List<PersisterStateBatch> mergeBatches(
         List<PersisterStateBatch> soFar,
         WriteShareGroupStateRequestData.PartitionData partitionData,
-        long startOffset) {
-        return new PersisterStateBatchCombiner(
-            soFar,
-            partitionData.stateBatches().stream()
-                .map(PersisterStateBatch::from)
-                .collect(Collectors.toList()),
+        long startOffset
+    ) {
+        return new PersisterStateBatchCombiner(soFar, 
partitionData.stateBatches().stream()
+            .map(PersisterStateBatch::from)
+            .toList(),
             startOffset
-        )
-            .combineStateBatches();
+        ).combineStateBatches();
     }
 
     private Optional<CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord>> maybeGetWriteStateError(
@@ -631,30 +676,30 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         int partitionId = partitionData.partition();
 
         if (topicId == null) {
-            return Optional.of(getWriteErrorResponse(Errors.INVALID_REQUEST, 
NULL_TOPIC_ID, null, partitionId));
+            return 
Optional.of(getWriteErrorCoordinatorResult(Errors.INVALID_REQUEST, 
NULL_TOPIC_ID, null, partitionId));
         }
 
         if (partitionId < 0) {
-            return Optional.of(getWriteErrorResponse(Errors.INVALID_REQUEST, 
NEGATIVE_PARTITION_ID, topicId, partitionId));
+            return 
Optional.of(getWriteErrorCoordinatorResult(Errors.INVALID_REQUEST, 
NEGATIVE_PARTITION_ID, topicId, partitionId));
         }
 
         SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, 
topicId, partitionId);
         if (partitionData.leaderEpoch() != -1 && 
leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) > 
partitionData.leaderEpoch()) {
             log.error("Request leader epoch smaller than last recorded.");
-            return 
Optional.of(getWriteErrorResponse(Errors.FENCED_LEADER_EPOCH, null, topicId, 
partitionId));
+            return 
Optional.of(getWriteErrorCoordinatorResult(Errors.FENCED_LEADER_EPOCH, null, 
topicId, partitionId));
         }
         if (partitionData.stateEpoch() != -1 && 
stateEpochMap.containsKey(mapKey) && stateEpochMap.get(mapKey) > 
partitionData.stateEpoch()) {
             log.error("Request state epoch smaller than last recorded.");
-            return 
Optional.of(getWriteErrorResponse(Errors.FENCED_STATE_EPOCH, null, topicId, 
partitionId));
+            return 
Optional.of(getWriteErrorCoordinatorResult(Errors.FENCED_STATE_EPOCH, null, 
topicId, partitionId));
         }
         if (metadataImage == null) {
             log.error("Metadata image is null");
-            return 
Optional.of(getWriteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, 
topicId, partitionId));
+            return 
Optional.of(getWriteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
null, topicId, partitionId));
         }
         if (metadataImage.topics().getTopic(topicId) == null ||
             metadataImage.topics().getPartition(topicId, partitionId) == null) 
{
             log.error("Topic/TopicPartition not found in metadata image.");
-            return 
Optional.of(getWriteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, 
topicId, partitionId));
+            return 
Optional.of(getWriteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
null, topicId, partitionId));
         }
 
         return Optional.empty();
@@ -743,28 +788,65 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         int partitionId = partitionData.partition();
 
         if (topicId == null) {
-            return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST, 
NULL_TOPIC_ID, null, partitionId));
+            return 
Optional.of(getDeleteErrorCoordinatorResult(Errors.INVALID_REQUEST, 
NULL_TOPIC_ID, null, partitionId));
+        }
+
+        if (partitionId < 0) {
+            return 
Optional.of(getDeleteErrorCoordinatorResult(Errors.INVALID_REQUEST, 
NEGATIVE_PARTITION_ID, topicId, partitionId));
+        }
+
+        if (metadataImage == null) {
+            log.error("Metadata image is null");
+            return 
Optional.of(getDeleteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
null, topicId, partitionId));
+        }
+
+        if (metadataImage.topics().getTopic(topicId) == null ||
+            metadataImage.topics().getPartition(topicId, partitionId) == null) 
{
+            log.error("Topic/TopicPartition not found in metadata image.");
+            return 
Optional.of(getDeleteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
null, topicId, partitionId));
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<CoordinatorResult<InitializeShareGroupStateResponseData, 
CoordinatorRecord>> maybeGetInitializeStateError(
+        InitializeShareGroupStateRequestData request
+    ) {
+        InitializeShareGroupStateRequestData.InitializeStateData topicData = 
request.topics().get(0);
+        InitializeShareGroupStateRequestData.PartitionData partitionData = 
topicData.partitions().get(0);
+
+        Uuid topicId = topicData.topicId();
+        int partitionId = partitionData.partition();
+
+        if (topicId == null) {
+            return 
Optional.of(getInitializeErrorCoordinatorResult(Errors.INVALID_REQUEST, 
NULL_TOPIC_ID, null, partitionId));
         }
 
         if (partitionId < 0) {
-            return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST, 
NEGATIVE_PARTITION_ID, topicId, partitionId));
+            return 
Optional.of(getInitializeErrorCoordinatorResult(Errors.INVALID_REQUEST, 
NEGATIVE_PARTITION_ID, topicId, partitionId));
+        }
+
+        SharePartitionKey key = 
SharePartitionKey.getInstance(request.groupId(), topicId, partitionId);
+        if (partitionData.stateEpoch() != -1 && stateEpochMap.containsKey(key) 
&& stateEpochMap.get(key) > partitionData.stateEpoch()) {
+            log.error("Initialize request state epoch smaller than last 
recorded.");
+            return 
Optional.of(getInitializeErrorCoordinatorResult(Errors.FENCED_STATE_EPOCH, 
Errors.FENCED_STATE_EPOCH.exception(), topicId, partitionId));
         }
 
         if (metadataImage == null) {
             log.error("Metadata image is null");
-            return 
Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, 
topicId, partitionId));
+            return 
Optional.of(getInitializeErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION,
 null, topicId, partitionId));
         }
 
         if (metadataImage.topics().getTopic(topicId) == null ||
             metadataImage.topics().getPartition(topicId, partitionId) == null) 
{
             log.error("Topic/TopicPartition not found in metadata image.");
-            return 
Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, 
topicId, partitionId));
+            return 
Optional.of(getInitializeErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION,
 null, topicId, partitionId));
         }
 
         return Optional.empty();
     }
 
-    private CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord> getWriteErrorResponse(
+    private CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord> getWriteErrorCoordinatorResult(
         Errors error,
         Exception exception,
         Uuid topicId,
@@ -772,10 +854,10 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
     ) {
         String message = exception == null ? error.message() : 
exception.getMessage();
         WriteShareGroupStateResponseData responseData = 
WriteShareGroupStateResponse.toErrorResponseData(topicId, partitionId, error, 
message);
-        return new CoordinatorResult<>(Collections.emptyList(), responseData);
+        return new CoordinatorResult<>(List.of(), responseData);
     }
 
-    private CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> getDeleteErrorResponse(
+    private CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> getDeleteErrorCoordinatorResult(
         Errors error,
         Exception exception,
         Uuid topicId,
@@ -783,7 +865,18 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
     ) {
         String message = exception == null ? error.message() : 
exception.getMessage();
         DeleteShareGroupStateResponseData responseData = 
DeleteShareGroupStateResponse.toErrorResponseData(topicId, partitionId, error, 
message);
-        return new CoordinatorResult<>(Collections.emptyList(), responseData);
+        return new CoordinatorResult<>(List.of(), responseData);
+    }
+
+    private CoordinatorResult<InitializeShareGroupStateResponseData, 
CoordinatorRecord> getInitializeErrorCoordinatorResult(
+        Errors error,
+        Exception exception,
+        Uuid topicId,
+        int partitionId
+    ) {
+        String message = exception == null ? error.message() : 
exception.getMessage();
+        InitializeShareGroupStateResponseData responseData = 
InitializeShareGroupStateResponse.toErrorResponseData(topicId, partitionId, 
error, message);
+        return new CoordinatorResult<>(List.of(), responseData);
     }
 
     // Visible for testing
@@ -820,7 +913,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
             .setLeaderEpoch(newLeaderEpoch)
             .setStateBatches(new PersisterStateBatchCombiner(currentBatches, 
newData.stateBatches().stream()
                 .map(ShareCoordinatorShard::toPersisterStateBatch)
-                .collect(Collectors.toList()), newStartOffset)
+                .toList(), newStartOffset)
                 .combineStateBatches())
             .build();
     }
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
index d7868f0cc63..2d659607a1d 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.coordinator.share;
 
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
 import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
 import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
 import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
@@ -94,13 +95,29 @@ public class ShareGroupOffset {
     }
 
     public static ShareGroupOffset 
fromRequest(WriteShareGroupStateRequestData.PartitionData data, int 
snapshotEpoch) {
-        return new ShareGroupOffset(snapshotEpoch,
+        return new ShareGroupOffset(
+            snapshotEpoch,
             data.stateEpoch(),
             data.leaderEpoch(),
             data.startOffset(),
             data.stateBatches().stream()
                 .map(PersisterStateBatch::from)
-                .collect(Collectors.toList()));
+                .toList()
+        );
+    }
+
+    public static ShareGroupOffset 
fromRequest(InitializeShareGroupStateRequestData.PartitionData data) {
+        return fromRequest(data, 0);
+    }
+
+    public static ShareGroupOffset 
fromRequest(InitializeShareGroupStateRequestData.PartitionData data, int 
snapshotEpoch) {
+        return new ShareGroupOffset(
+            snapshotEpoch,
+            data.stateEpoch(),
+            -1,
+            data.startOffset(),
+            List.of()
+        );
     }
 
     public LinkedHashSet<PersisterStateBatch> stateBatchAsSet() {
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 8738db4dd9a..399643e32a9 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -26,6 +26,8 @@ import 
org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
+import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
 import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -51,7 +53,6 @@ import org.apache.kafka.server.util.timer.Timer;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -133,7 +134,7 @@ class ShareCoordinatorServiceTest {
 
         WriteShareGroupStateRequestData request = new 
WriteShareGroupStateRequestData()
             .setGroupId(groupId)
-            .setTopics(Arrays.asList(
+            .setTopics(List.of(
                     new WriteShareGroupStateRequestData.WriteStateData()
                         .setTopicId(topicId1)
                         .setPartitions(List.of(
@@ -199,7 +200,7 @@ class ShareCoordinatorServiceTest {
 
         HashSet<WriteShareGroupStateResponseData.WriteStateResult> result = 
new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
 
-        HashSet<WriteShareGroupStateResponseData.WriteStateResult> 
expectedResult = new HashSet<>(Arrays.asList(
+        HashSet<WriteShareGroupStateResponseData.WriteStateResult> 
expectedResult = new HashSet<>(List.of(
             new WriteShareGroupStateResponseData.WriteStateResult()
                 .setTopicId(topicId2)
                 .setPartitions(List.of(new 
WriteShareGroupStateResponseData.PartitionResult()
@@ -210,7 +211,7 @@ class ShareCoordinatorServiceTest {
                     .setPartition(partition1)))));
         assertEquals(expectedResult, result);
         verify(time, times(2)).hiResClockMs();
-        Set<MetricName> expectedMetrics = new HashSet<>(Arrays.asList(
+        Set<MetricName> expectedMetrics = new HashSet<>(List.of(
             metrics.metricName("write-latency-avg", 
ShareCoordinatorMetrics.METRICS_GROUP),
             metrics.metricName("write-latency-max", 
ShareCoordinatorMetrics.METRICS_GROUP),
             metrics.metricName("write-rate", 
ShareCoordinatorMetrics.METRICS_GROUP),
@@ -243,7 +244,7 @@ class ShareCoordinatorServiceTest {
 
         ReadShareGroupStateRequestData request = new 
ReadShareGroupStateRequestData()
             .setGroupId(groupId)
-            .setTopics(Arrays.asList(
+            .setTopics(List.of(
                     new ReadShareGroupStateRequestData.ReadStateData()
                         .setTopicId(topicId1)
                         .setPartitions(List.of(
@@ -283,7 +284,7 @@ class ShareCoordinatorServiceTest {
                 .setErrorCode(Errors.NONE.code())
                 .setStateEpoch(1)
                 .setStartOffset(0)
-                .setStateBatches(Arrays.asList(
+                .setStateBatches(List.of(
                     new ReadShareGroupStateResponseData.StateBatch()
                         .setFirstOffset(0)
                         .setLastOffset(10)
@@ -315,7 +316,7 @@ class ShareCoordinatorServiceTest {
 
         HashSet<ReadShareGroupStateResponseData.ReadStateResult> result = new 
HashSet<>(future.get(5, TimeUnit.SECONDS).results());
 
-        HashSet<ReadShareGroupStateResponseData.ReadStateResult> 
expectedResult = new HashSet<>(Arrays.asList(
+        HashSet<ReadShareGroupStateResponseData.ReadStateResult> 
expectedResult = new HashSet<>(List.of(
             topicData1,
             topicData2));
         assertEquals(expectedResult, result);
@@ -345,23 +346,20 @@ class ShareCoordinatorServiceTest {
 
         ReadShareGroupStateSummaryRequestData request = new 
ReadShareGroupStateSummaryRequestData()
             .setGroupId(groupId)
-            .setTopics(Arrays.asList(
-                    new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
-                        .setTopicId(topicId1)
-                        .setPartitions(List.of(
-                            new 
ReadShareGroupStateSummaryRequestData.PartitionData()
-                                .setPartition(partition1)
-                                .setLeaderEpoch(1)
-                        )),
-                    new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
-                        .setTopicId(topicId2)
-                        .setPartitions(List.of(
-                            new 
ReadShareGroupStateSummaryRequestData.PartitionData()
-                                .setPartition(partition2)
-                                .setLeaderEpoch(1)
-                        ))
-                )
-            );
+            .setTopics(List.of(
+                new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                    .setTopicId(topicId1)
+                    .setPartitions(List.of(
+                        new 
ReadShareGroupStateSummaryRequestData.PartitionData()
+                            .setPartition(partition1)
+                            .setLeaderEpoch(1))),
+                new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                    .setTopicId(topicId2)
+                    .setPartitions(List.of(
+                        new 
ReadShareGroupStateSummaryRequestData.PartitionData()
+                            .setPartition(partition2)
+                            .setLeaderEpoch(1)))
+            ));
 
         ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult 
topicData1 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
             .setTopicId(topicId1)
@@ -385,12 +383,12 @@ class ShareCoordinatorServiceTest {
             eq("read-share-group-state-summary"),
             eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
             any(),
-            any()
-        ))
-            .thenReturn(CompletableFuture.completedFuture(new 
ReadShareGroupStateSummaryResponseData()
-                .setResults(List.of(topicData1))))
-            .thenReturn(CompletableFuture.completedFuture(new 
ReadShareGroupStateSummaryResponseData()
-                .setResults(List.of(topicData2))));
+            any())
+        ).thenReturn(CompletableFuture.completedFuture(new 
ReadShareGroupStateSummaryResponseData()
+            .setResults(List.of(topicData1)))
+        ).thenReturn(CompletableFuture.completedFuture(new 
ReadShareGroupStateSummaryResponseData()
+            .setResults(List.of(topicData2)))
+        );
 
         CompletableFuture<ReadShareGroupStateSummaryResponseData> future = 
service.readStateSummary(
             requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
@@ -399,7 +397,7 @@ class ShareCoordinatorServiceTest {
 
         HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> 
result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
 
-        HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> 
expectedResult = new HashSet<>(Arrays.asList(
+        HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> 
expectedResult = new HashSet<>(List.of(
             topicData1,
             topicData2));
         assertEquals(expectedResult, result);
@@ -432,21 +430,18 @@ class ShareCoordinatorServiceTest {
 
         DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
             .setGroupId(groupId)
-            .setTopics(Arrays.asList(
-                    new DeleteShareGroupStateRequestData.DeleteStateData()
-                        .setTopicId(topicId1)
-                        .setPartitions(List.of(
-                            new 
DeleteShareGroupStateRequestData.PartitionData()
-                                .setPartition(partition1)
-                        )),
-                    new DeleteShareGroupStateRequestData.DeleteStateData()
-                        .setTopicId(topicId2)
-                        .setPartitions(List.of(
-                            new 
DeleteShareGroupStateRequestData.PartitionData()
-                                .setPartition(partition2)
-                        ))
-                )
-            );
+            .setTopics(List.of(
+                new DeleteShareGroupStateRequestData.DeleteStateData()
+                    .setTopicId(topicId1)
+                    .setPartitions(List.of(
+                        new DeleteShareGroupStateRequestData.PartitionData()
+                            .setPartition(partition1))),
+                new DeleteShareGroupStateRequestData.DeleteStateData()
+                    .setTopicId(topicId2)
+                    .setPartitions(List.of(
+                        new DeleteShareGroupStateRequestData.PartitionData()
+                            .setPartition(partition2)))
+            ));
 
         DeleteShareGroupStateResponseData response1 = new 
DeleteShareGroupStateResponseData()
             .setResults(List.of(
@@ -469,9 +464,7 @@ class ShareCoordinatorServiceTest {
             eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
             eq(Duration.ofMillis(5000)),
             any()
-        ))
-            .thenReturn(CompletableFuture.completedFuture(response1))
-            .thenReturn(CompletableFuture.completedFuture(response2));
+        
)).thenReturn(CompletableFuture.completedFuture(response1)).thenReturn(CompletableFuture.completedFuture(response2));
 
         CompletableFuture<DeleteShareGroupStateResponseData> future = 
service.deleteState(
             requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
@@ -480,7 +473,7 @@ class ShareCoordinatorServiceTest {
 
         HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> result = 
new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
 
-        HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> 
expectedResult = new HashSet<>(Arrays.asList(
+        HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> 
expectedResult = new HashSet<>(List.of(
             new DeleteShareGroupStateResponseData.DeleteStateResult()
                 .setTopicId(topicId2)
                 .setPartitions(List.of(new 
DeleteShareGroupStateResponseData.PartitionResult()
@@ -492,6 +485,91 @@ class ShareCoordinatorServiceTest {
         assertEquals(expectedResult, result);
     }
 
+    @Test
+    public void testInitializeStateSuccess() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Metrics metrics = new Metrics();
+        ShareCoordinatorMetrics coordinatorMetrics = new 
ShareCoordinatorMetrics(metrics);
+        Time time = mock(Time.class);
+        ShareCoordinatorService service = new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            coordinatorMetrics,
+            time,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
+        );
+
+        service.startup(() -> 1);
+
+        String groupId = "group1";
+        Uuid topicId1 = Uuid.randomUuid();
+        int partition1 = 0;
+
+        Uuid topicId2 = Uuid.randomUuid();
+        int partition2 = 1;
+
+        InitializeShareGroupStateRequestData request = new 
InitializeShareGroupStateRequestData()
+            .setGroupId(groupId)
+            .setTopics(List.of(
+                new InitializeShareGroupStateRequestData.InitializeStateData()
+                    .setTopicId(topicId1)
+                    .setPartitions(List.of(
+                        new 
InitializeShareGroupStateRequestData.PartitionData()
+                            .setPartition(partition1)
+                            .setStartOffset(0)
+                            .setStateEpoch(1))),
+                new InitializeShareGroupStateRequestData.InitializeStateData()
+                    .setTopicId(topicId2)
+                    .setPartitions(List.of(
+                        new 
InitializeShareGroupStateRequestData.PartitionData()
+                            .setPartition(partition2)
+                            .setStartOffset(5)
+                            .setStateEpoch(1)))
+            ));
+
+        InitializeShareGroupStateResponseData response1 = new 
InitializeShareGroupStateResponseData().setResults(List.of(
+            new InitializeShareGroupStateResponseData.InitializeStateResult()
+                .setTopicId(topicId1)
+                .setPartitions(List.of(new 
InitializeShareGroupStateResponseData.PartitionResult()
+                    .setPartition(partition1)))
+        ));
+
+        InitializeShareGroupStateResponseData response2 = new 
InitializeShareGroupStateResponseData().setResults(List.of(
+            new InitializeShareGroupStateResponseData.InitializeStateResult()
+                .setTopicId(topicId2)
+                .setPartitions(List.of(new 
InitializeShareGroupStateResponseData.PartitionResult()
+                    .setPartition(partition2)))
+        ));
+
+        when(runtime.scheduleWriteOperation(
+            eq("initialize-share-group-state"),
+            eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
+            eq(Duration.ofMillis(5000)),
+            
any())).thenReturn(CompletableFuture.completedFuture(response1)).thenReturn(CompletableFuture.completedFuture(response2)
+        );
+
+        CompletableFuture<InitializeShareGroupStateResponseData> future = 
service.initializeState(
+            requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
+            request
+        );
+
+        HashSet<InitializeShareGroupStateResponseData.InitializeStateResult> 
result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
+
+        HashSet<InitializeShareGroupStateResponseData.InitializeStateResult> 
expectedResult = new HashSet<>(List.of(
+            new InitializeShareGroupStateResponseData.InitializeStateResult()
+                .setTopicId(topicId2)
+                .setPartitions(List.of(new 
InitializeShareGroupStateResponseData.PartitionResult()
+                    .setPartition(partition2))),
+            new InitializeShareGroupStateResponseData.InitializeStateResult()
+                .setTopicId(topicId1)
+                .setPartitions(List.of(new 
InitializeShareGroupStateResponseData.PartitionResult()
+                    .setPartition(partition1)))
+        ));
+        assertEquals(expectedResult, result);
+    }
+
     @Test
     public void testWriteStateValidationsError() throws ExecutionException, 
InterruptedException, TimeoutException {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
@@ -680,6 +758,53 @@ class ShareCoordinatorServiceTest {
         );
     }
 
+    @Test
+    public void testInitializeStateValidationError() throws 
ExecutionException, InterruptedException, TimeoutException {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        ShareCoordinatorService service = new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
+        );
+
+        service.startup(() -> 1);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 0;
+
+        // 1. Empty topicsData
+        assertEquals(new InitializeShareGroupStateResponseData(),
+            service.initializeState(
+                requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
+                new InitializeShareGroupStateRequestData().setGroupId(groupId)
+            ).get(5, TimeUnit.SECONDS)
+        );
+
+        // 2. Empty partitionsData
+        assertEquals(new InitializeShareGroupStateResponseData(),
+            service.initializeState(
+                requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
+                new 
InitializeShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(
+                    new 
InitializeShareGroupStateRequestData.InitializeStateData().setTopicId(topicId)))
+            ).get(5, TimeUnit.SECONDS)
+        );
+
+        // 3. Invalid groupId
+        assertEquals(new InitializeShareGroupStateResponseData(),
+            service.initializeState(
+                requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
+                new 
InitializeShareGroupStateRequestData().setGroupId(null).setTopics(List.of(
+                    new 
InitializeShareGroupStateRequestData.InitializeStateData().setTopicId(topicId).setPartitions(List.of(
+                        new 
InitializeShareGroupStateRequestData.PartitionData().setPartition(partition)))))
+            ).get(5, TimeUnit.SECONDS)
+        );
+    }
+
     @Test
     public void testWriteStateWhenNotStarted() throws ExecutionException, 
InterruptedException, TimeoutException {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
@@ -702,7 +827,7 @@ class ShareCoordinatorServiceTest {
 
         WriteShareGroupStateRequestData request = new 
WriteShareGroupStateRequestData()
             .setGroupId(groupId)
-            .setTopics(Arrays.asList(
+            .setTopics(List.of(
                     new WriteShareGroupStateRequestData.WriteStateData()
                         .setTopicId(topicId1)
                         .setPartitions(List.of(
@@ -743,7 +868,7 @@ class ShareCoordinatorServiceTest {
 
         HashSet<WriteShareGroupStateResponseData.WriteStateResult> result = 
new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
 
-        HashSet<WriteShareGroupStateResponseData.WriteStateResult> 
expectedResult = new HashSet<>(Arrays.asList(
+        HashSet<WriteShareGroupStateResponseData.WriteStateResult> 
expectedResult = new HashSet<>(List.of(
             new WriteShareGroupStateResponseData.WriteStateResult()
                 .setTopicId(topicId2)
                 .setPartitions(List.of(new 
WriteShareGroupStateResponseData.PartitionResult()
@@ -781,7 +906,7 @@ class ShareCoordinatorServiceTest {
 
         ReadShareGroupStateRequestData request = new 
ReadShareGroupStateRequestData()
             .setGroupId(groupId)
-            .setTopics(Arrays.asList(
+            .setTopics(List.of(
                     new ReadShareGroupStateRequestData.ReadStateData()
                         .setTopicId(topicId1)
                         .setPartitions(List.of(
@@ -806,7 +931,7 @@ class ShareCoordinatorServiceTest {
 
         HashSet<ReadShareGroupStateResponseData.ReadStateResult> result = new 
HashSet<>(future.get(5, TimeUnit.SECONDS).results());
 
-        HashSet<ReadShareGroupStateResponseData.ReadStateResult> 
expectedResult = new HashSet<>(Arrays.asList(
+        HashSet<ReadShareGroupStateResponseData.ReadStateResult> 
expectedResult = new HashSet<>(List.of(
             new ReadShareGroupStateResponseData.ReadStateResult()
                 .setTopicId(topicId2)
                 .setPartitions(List.of(new 
ReadShareGroupStateResponseData.PartitionResult()
@@ -844,7 +969,7 @@ class ShareCoordinatorServiceTest {
 
         ReadShareGroupStateSummaryRequestData request = new 
ReadShareGroupStateSummaryRequestData()
             .setGroupId(groupId)
-            .setTopics(Arrays.asList(
+            .setTopics(List.of(
                     new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
                         .setTopicId(topicId1)
                         .setPartitions(List.of(
@@ -869,7 +994,7 @@ class ShareCoordinatorServiceTest {
 
         HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> 
result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
 
-        HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> 
expectedResult = new HashSet<>(Arrays.asList(
+        HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> 
expectedResult = new HashSet<>(List.of(
             new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
                 .setTopicId(topicId2)
                 .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
@@ -907,7 +1032,7 @@ class ShareCoordinatorServiceTest {
 
         DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
             .setGroupId(groupId)
-            .setTopics(Arrays.asList(
+            .setTopics(List.of(
                     new DeleteShareGroupStateRequestData.DeleteStateData()
                         .setTopicId(topicId1)
                         .setPartitions(List.of(
@@ -930,7 +1055,7 @@ class ShareCoordinatorServiceTest {
 
         HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> result = 
new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
 
-        HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> 
expectedResult = new HashSet<>(Arrays.asList(
+        HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> 
expectedResult = new HashSet<>(List.of(
             new DeleteShareGroupStateResponseData.DeleteStateResult()
                 .setTopicId(topicId2)
                 .setPartitions(List.of(new 
DeleteShareGroupStateResponseData.PartitionResult()
@@ -946,6 +1071,66 @@ class ShareCoordinatorServiceTest {
         assertEquals(expectedResult, result);
     }
 
+    @Test
+    public void testInitializeStateWhenNotStarted() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        ShareCoordinatorService service = new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
+        );
+
+        String groupId = "group1";
+        Uuid topicId1 = Uuid.randomUuid();
+        int partition1 = 0;
+
+        Uuid topicId2 = Uuid.randomUuid();
+        int partition2 = 1;
+
+        InitializeShareGroupStateRequestData request = new 
InitializeShareGroupStateRequestData()
+            .setGroupId(groupId)
+            .setTopics(List.of(
+                new InitializeShareGroupStateRequestData.InitializeStateData()
+                    .setTopicId(topicId1)
+                    .setPartitions(List.of(
+                        new 
InitializeShareGroupStateRequestData.PartitionData()
+                            .setPartition(partition1)
+                    )),
+                new InitializeShareGroupStateRequestData.InitializeStateData()
+                    .setTopicId(topicId2)
+                    .setPartitions(List.of(
+                        new 
InitializeShareGroupStateRequestData.PartitionData()
+                            .setPartition(partition2)
+                    ))
+            ));
+
+        CompletableFuture<InitializeShareGroupStateResponseData> future = 
service.initializeState(
+            requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
+            request
+        );
+
+        HashSet<InitializeShareGroupStateResponseData.InitializeStateResult> 
result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
+
+        HashSet<InitializeShareGroupStateResponseData.InitializeStateResult> 
expectedResult = new HashSet<>(List.of(
+            new InitializeShareGroupStateResponseData.InitializeStateResult()
+                .setTopicId(topicId2)
+                .setPartitions(List.of(new 
InitializeShareGroupStateResponseData.PartitionResult()
+                    .setPartition(partition2)
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                    .setErrorMessage("Share coordinator is not available."))),
+            new InitializeShareGroupStateResponseData.InitializeStateResult()
+                .setTopicId(topicId1)
+                .setPartitions(List.of(new 
InitializeShareGroupStateResponseData.PartitionResult()
+                    .setPartition(partition1)
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                    .setErrorMessage("Share coordinator is not 
available.")))));
+        assertEquals(expectedResult, result);
+    }
+
     @Test
     public void testWriteFutureReturnsError() throws ExecutionException, 
InterruptedException, TimeoutException {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
@@ -1124,6 +1309,47 @@ class ShareCoordinatorServiceTest {
         );
     }
 
+    @Test
+    public void testInitializeFutureReturnsError() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        ShareCoordinatorService service = new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
+        );
+
+        service.startup(() -> 1);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 0;
+
+        when(runtime.scheduleWriteOperation(any(), any(), any(), 
any())).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
+
+        assertEquals(
+            new InitializeShareGroupStateResponseData().setResults(List.of(new 
InitializeShareGroupStateResponseData.InitializeStateResult()
+                .setTopicId(topicId)
+                .setPartitions(List.of(new 
InitializeShareGroupStateResponseData.PartitionResult()
+                    .setPartition(partition)
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                    .setErrorMessage("Unable to initialize share group state: 
This server does not host this topic-partition.")))
+            )),
+            
service.initializeState(requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
+                new InitializeShareGroupStateRequestData().setGroupId(groupId)
+                    .setTopics(List.of(new 
InitializeShareGroupStateRequestData.InitializeStateData()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(new 
InitializeShareGroupStateRequestData.PartitionData()
+                            .setPartition(partition)
+                        ))
+                    ))
+            ).get(5, TimeUnit.SECONDS)
+        );
+    }
+
     @Test
     public void testTopicPartitionFor() {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
@@ -1676,7 +1902,7 @@ class ShareCoordinatorServiceTest {
     }
 
     private void checkMetrics(Metrics metrics) {
-        Set<MetricName> usualMetrics = new HashSet<>(Arrays.asList(
+        Set<MetricName> usualMetrics = new HashSet<>(List.of(
             metrics.metricName("write-latency-avg", 
ShareCoordinatorMetrics.METRICS_GROUP),
             metrics.metricName("write-latency-max", 
ShareCoordinatorMetrics.METRICS_GROUP),
             metrics.metricName("write-rate", 
ShareCoordinatorMetrics.METRICS_GROUP),
@@ -1687,17 +1913,12 @@ class ShareCoordinatorServiceTest {
     }
 
     private void checkPruneMetric(Metrics metrics, String topic, int 
partition, boolean checkPresence) {
-        boolean isPresent = metrics.metrics().containsKey(
-            metrics.metricName(
-                "last-pruned-offset",
-                ShareCoordinatorMetrics.METRICS_GROUP,
-                "The offset at which the share-group state topic was last 
pruned.",
-                Map.of(
-                    "topic", topic,
-                    "partition", Integer.toString(partition)
-                )
-            )
-        );
+        boolean isPresent = metrics.metrics().containsKey(metrics.metricName(
+            "last-pruned-offset",
+            ShareCoordinatorMetrics.METRICS_GROUP,
+            "The offset at which the share-group state topic was last pruned.",
+            Map.of("topic", topic, "partition", Integer.toString(partition))
+        ));
         assertEquals(checkPresence, isPresent);
     }
 }
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index 7ebb5ce3954..ab136f8dc31 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -20,6 +20,8 @@ package org.apache.kafka.coordinator.share;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
+import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
 import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -29,6 +31,7 @@ import 
org.apache.kafka.common.message.WriteShareGroupStateResponseData;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
+import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
 import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
@@ -1173,6 +1176,184 @@ class ShareCoordinatorShardTest {
         verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0));
     }
 
+    @Test
+    public void testInitializeStateSuccess() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+
+        SharePartitionKey shareCoordinatorKey = 
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+        InitializeShareGroupStateRequestData request = new 
InitializeShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
InitializeShareGroupStateRequestData.InitializeStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
InitializeShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)
+                    .setStartOffset(10)
+                    .setStateEpoch(5)))
+            ));
+
+        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+
+        CoordinatorResult<InitializeShareGroupStateResponseData, 
CoordinatorRecord> result = shard.initializeState(request);
+        result.records().forEach(record -> shard.replay(0L, 0L, (short) 0, 
record));
+
+        InitializeShareGroupStateResponseData expectedData = 
InitializeShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+        List<CoordinatorRecord> expectedRecords = List.of(
+            ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+                GROUP_ID, TOPIC_ID, PARTITION, 
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0))
+            ));
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+    }
+
+    @Test
+    public void testInitializeStateInvalidRequestData() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+
+        // invalid partition
+        int partition = -1;
+
+        InitializeShareGroupStateRequestData request = new 
InitializeShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
InitializeShareGroupStateRequestData.InitializeStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
InitializeShareGroupStateRequestData.PartitionData()
+                    .setPartition(partition)
+                ))
+            ));
+
+        CoordinatorResult<InitializeShareGroupStateResponseData, 
CoordinatorRecord> result = shard.initializeState(request);
+
+        InitializeShareGroupStateResponseData expectedData = 
InitializeShareGroupStateResponse.toErrorResponseData(
+            TOPIC_ID, partition, Errors.INVALID_REQUEST, 
ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage());
+        List<CoordinatorRecord> expectedRecords = List.of();
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        // invalid state epoch
+        int stateEpoch = 1;
+        partition = 0;
+        shard.replay(0L, 0L, (short) 0, 
ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+            GROUP_ID, TOPIC_ID, partition, new ShareGroupOffset.Builder()
+                .setStateEpoch(5)
+                .setSnapshotEpoch(0)
+                .setStateBatches(List.of())
+                .build()
+        ));
+
+        request = new InitializeShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
InitializeShareGroupStateRequestData.InitializeStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
InitializeShareGroupStateRequestData.PartitionData()
+                    .setPartition(partition)
+                ))
+            ));
+
+        result = shard.initializeState(request);
+
+        expectedData = InitializeShareGroupStateResponse.toErrorResponseData(
+            TOPIC_ID, partition, Errors.FENCED_STATE_EPOCH, 
Errors.FENCED_STATE_EPOCH.exception().getMessage());
+        expectedRecords = List.of();
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void testInitializeNullMetadataImage() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        shard.onNewMetadataImage(null, null);
+
+        InitializeShareGroupStateRequestData request = new 
InitializeShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
InitializeShareGroupStateRequestData.InitializeStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
InitializeShareGroupStateRequestData.PartitionData()
+                    .setPartition(0)
+                ))
+            ));
+
+        CoordinatorResult<InitializeShareGroupStateResponseData, 
CoordinatorRecord> result = shard.initializeState(request);
+
+        InitializeShareGroupStateResponseData expectedData = 
InitializeShareGroupStateResponse.toErrorResponseData(
+            TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+        List<CoordinatorRecord> expectedRecords = List.of();
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void testInitializeTopicIdNonExistentInMetadataImage() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        MetadataImage image = mock(MetadataImage.class);
+        shard.onNewMetadataImage(image, null);
+
+        InitializeShareGroupStateRequestData request = new 
InitializeShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
InitializeShareGroupStateRequestData.InitializeStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
InitializeShareGroupStateRequestData.PartitionData()
+                    .setPartition(0)
+                ))
+            ));
+
+        // topic id not found in cache
+        TopicsImage topicsImage = mock(TopicsImage.class);
+        when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(null);
+        when(image.topics()).thenReturn(topicsImage);
+        CoordinatorResult<InitializeShareGroupStateResponseData, 
CoordinatorRecord> result = shard.initializeState(request);
+
+        InitializeShareGroupStateResponseData expectedData = 
InitializeShareGroupStateResponse.toErrorResponseData(
+            TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+        List<CoordinatorRecord> expectedRecords = List.of();
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+        verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID));
+    }
+
+    @Test
+    public void testInitializePartitionIdNonExistentInMetadataImage() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        MetadataImage image = mock(MetadataImage.class);
+        shard.onNewMetadataImage(image, null);
+
+        InitializeShareGroupStateRequestData request = new 
InitializeShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
InitializeShareGroupStateRequestData.InitializeStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
InitializeShareGroupStateRequestData.PartitionData()
+                    .setPartition(0)
+                ))
+            ));
+
+        // topic id found in cache
+        TopicsImage topicsImage = mock(TopicsImage.class);
+        
when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(mock(TopicImage.class));
+        when(image.topics()).thenReturn(topicsImage);
+
+        // partition id not found
+        when(topicsImage.getPartition(eq(TOPIC_ID), eq(0))).thenReturn(null);
+        CoordinatorResult<InitializeShareGroupStateResponseData, 
CoordinatorRecord> result = shard.initializeState(request);
+
+        InitializeShareGroupStateResponseData expectedData = 
InitializeShareGroupStateResponse.toErrorResponseData(
+            TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+        List<CoordinatorRecord> expectedRecords = List.of();
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+        verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID));
+        verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0));
+    }
+
     private static ShareGroupOffset groupOffset(ApiMessage record) {
         if (record instanceof ShareSnapshotValue) {
             return ShareGroupOffset.fromRecord((ShareSnapshotValue) record);


Reply via email to