This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f32932cc257 KAFKA-18629: Delete share group state impl [1/N] (#18712)
f32932cc257 is described below

commit f32932cc257be48bec4dc6aa9bf30b666f0a5a60
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue Jan 28 17:13:01 2025 +0530

    KAFKA-18629: Delete share group state impl [1/N] (#18712)
    
    Reviewers: Christo Lolov <[email protected]>, Andrew Schofield 
<[email protected]>
---
 .../requests/DeleteShareGroupStateResponse.java    |  45 +++
 core/src/main/scala/kafka/server/KafkaApis.scala   |  19 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 119 +++++++
 .../kafka/coordinator/share/ShareCoordinator.java  |  10 +
 .../share/ShareCoordinatorRecordHelpers.java       |  10 +
 .../coordinator/share/ShareCoordinatorService.java | 130 ++++++++
 .../coordinator/share/ShareCoordinatorShard.java   | 125 ++++++-
 .../share/ShareCoordinatorRecordHelpersTest.java   |  21 ++
 .../share/ShareCoordinatorServiceTest.java         | 360 +++++++++++++++++----
 .../share/ShareCoordinatorShardTest.java           | 216 +++++++++++++
 10 files changed, 981 insertions(+), 74 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java
index 78c87a63987..69087ebc0c8 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java
@@ -17,13 +17,16 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class DeleteShareGroupStateResponse extends AbstractResponse {
@@ -65,4 +68,46 @@ public class DeleteShareGroupStateResponse extends 
AbstractResponse {
                 new DeleteShareGroupStateResponseData(new 
ByteBufferAccessor(buffer), version)
         );
     }
+
+    public static DeleteShareGroupStateResponseData toResponseData(Uuid 
topicId, int partitionId) {
+        return new DeleteShareGroupStateResponseData()
+            .setResults(List.of(
+                new DeleteShareGroupStateResponseData.DeleteStateResult()
+                    .setTopicId(topicId)
+                    .setPartitions(List.of(
+                        new DeleteShareGroupStateResponseData.PartitionResult()
+                            .setPartition(partitionId)))));
+    }
+
+    public static DeleteShareGroupStateResponseData.PartitionResult 
toErrorResponsePartitionResult(
+        int partitionId,
+        Errors error,
+        String errorMessage
+    ) {
+        return new DeleteShareGroupStateResponseData.PartitionResult()
+            .setPartition(partitionId)
+            .setErrorCode(error.code())
+            .setErrorMessage(errorMessage);
+    }
+
+    public static DeleteShareGroupStateResponseData.DeleteStateResult 
toResponseDeleteStateResult(Uuid topicId, 
List<DeleteShareGroupStateResponseData.PartitionResult> partitionResults) {
+        return new DeleteShareGroupStateResponseData.DeleteStateResult()
+            .setTopicId(topicId)
+            .setPartitions(partitionResults);
+    }
+
+    public static DeleteShareGroupStateResponseData.PartitionResult 
toResponsePartitionResult(int partitionId) {
+        return new DeleteShareGroupStateResponseData.PartitionResult()
+            .setPartition(partitionId);
+    }
+
+    public static DeleteShareGroupStateResponseData toErrorResponseData(Uuid 
topicId, int partitionId, Errors error, String errorMessage) {
+        return new DeleteShareGroupStateResponseData().setResults(
+            Collections.singletonList(new 
DeleteShareGroupStateResponseData.DeleteStateResult()
+                .setTopicId(topicId)
+                .setPartitions(List.of(new 
DeleteShareGroupStateResponseData.PartitionResult()
+                    .setPartition(partitionId)
+                    .setErrorCode(error.code())
+                    .setErrorMessage(errorMessage)))));
+    }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 11623b02a3d..82132d868bf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3148,9 +3148,22 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleDeleteShareGroupStateRequest(request: RequestChannel.Request): 
Unit = {
     val deleteShareGroupStateRequest = 
request.body[DeleteShareGroupStateRequest]
-    // TODO: Implement the DeleteShareGroupStateRequest handling
-    requestHelper.sendMaybeThrottle(request, 
deleteShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+
+    shareCoordinator match {
+      case None => requestHelper.sendResponseMaybeThrottle(request, 
requestThrottleMs =>
+        deleteShareGroupStateRequest.getErrorResponse(requestThrottleMs,
+          new ApiException("Share coordinator is not enabled.")))
+
+      case Some(coordinator) => coordinator.deleteState(request.context, 
deleteShareGroupStateRequest.data)
+        .handle[Unit] { (response, exception) =>
+          if (exception != null) {
+            requestHelper.sendMaybeThrottle(request, 
deleteShareGroupStateRequest.getErrorResponse(exception))
+          } else {
+            requestHelper.sendMaybeThrottle(request, new 
DeleteShareGroupStateResponse(response))
+          }
+        }
+    }
   }
 
   def handleReadShareGroupStateSummaryRequest(request: 
RequestChannel.Request): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 18930be5ee6..bcf9ec587d5 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -10547,6 +10547,96 @@ class KafkaApisTest extends Logging {
     })
   }
 
+  @Test
+  def testDeleteShareGroupStateSuccess(): Unit = {
+    val topicId = Uuid.randomUuid();
+    val deleteRequestData = new DeleteShareGroupStateRequestData()
+      .setGroupId("group1")
+      .setTopics(List(
+        new DeleteShareGroupStateRequestData.DeleteStateData()
+          .setTopicId(topicId)
+          .setPartitions(List(
+            new DeleteShareGroupStateRequestData.PartitionData()
+              .setPartition(1)
+          ).asJava)
+      ).asJava)
+
+    val deleteStateResultData: 
util.List[DeleteShareGroupStateResponseData.DeleteStateResult] = List(
+      new DeleteShareGroupStateResponseData.DeleteStateResult()
+        .setTopicId(topicId)
+        .setPartitions(List(
+          new DeleteShareGroupStateResponseData.PartitionResult()
+            .setPartition(1)
+            .setErrorCode(Errors.NONE.code())
+            .setErrorMessage(null)
+        ).asJava)
+    ).asJava
+
+    val config = Map(
+      ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
+    )
+
+    val response = getDeleteShareGroupResponse(
+      deleteRequestData,
+      config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
+      verifyNoErr = true,
+      null,
+      deleteStateResultData
+    )
+
+    assertNotNull(response.data)
+    assertEquals(1, response.data.results.size)
+  }
+
+  @Test
+  def testDeleteShareGroupStateAuthorizationFailed(): Unit = {
+    val topicId = Uuid.randomUuid();
+    val deleteRequestData = new DeleteShareGroupStateRequestData()
+      .setGroupId("group1")
+      .setTopics(List(
+        new DeleteShareGroupStateRequestData.DeleteStateData()
+          .setTopicId(topicId)
+          .setPartitions(List(
+            new DeleteShareGroupStateRequestData.PartitionData()
+              .setPartition(1)
+          ).asJava)
+      ).asJava)
+
+    val deleteStateResultData: 
util.List[DeleteShareGroupStateResponseData.DeleteStateResult] = List(
+      new DeleteShareGroupStateResponseData.DeleteStateResult()
+        .setTopicId(topicId)
+        .setPartitions(List(
+          new DeleteShareGroupStateResponseData.PartitionResult()
+            .setPartition(1)
+            .setErrorCode(Errors.NONE.code())
+            .setErrorMessage(null)
+        ).asJava)
+    ).asJava
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava, 
Seq(AuthorizationResult.ALLOWED).asJava)
+
+    val config = Map(
+      ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
+    )
+
+    val response = getDeleteShareGroupResponse(
+      deleteRequestData,
+      config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
+      verifyNoErr = false,
+      authorizer,
+      deleteStateResultData
+    )
+
+    assertNotNull(response.data)
+    assertEquals(1, response.data.results.size)
+    response.data.results.forEach(deleteResult => {
+      assertEquals(1, deleteResult.partitions.size)
+      assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED.code(), 
deleteResult.partitions.get(0).errorCode())
+    })
+  }
+
   def getShareGroupDescribeResponse(groupIds: util.List[String], 
configOverrides: Map[String, String] = Map.empty,
                                     verifyNoErr: Boolean = true, authorizer: 
Authorizer = null,
                                     describedGroups: 
util.List[ShareGroupDescribeResponseData.DescribedGroup]): 
ShareGroupDescribeResponse = {
@@ -10663,4 +10753,33 @@ class KafkaApisTest extends Logging {
     }
     response
   }
+
+  def getDeleteShareGroupResponse(requestData: 
DeleteShareGroupStateRequestData, configOverrides: Map[String, String] = 
Map.empty,
+                                  verifyNoErr: Boolean = true, authorizer: 
Authorizer = null,
+                                  deleteStateResult: 
util.List[DeleteShareGroupStateResponseData.DeleteStateResult]): 
DeleteShareGroupStateResponse = {
+    val requestChannelRequest = buildRequest(new 
DeleteShareGroupStateRequest.Builder(requestData, true).build())
+
+    val future = new CompletableFuture[DeleteShareGroupStateResponseData]()
+    when(shareCoordinator.deleteState(
+      any[RequestContext],
+      any[DeleteShareGroupStateRequestData]
+    )).thenReturn(future)
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    kafkaApis = createKafkaApis(
+      overrideProperties = configOverrides,
+      authorizer = Option(authorizer),
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching())
+
+    future.complete(new DeleteShareGroupStateResponseData()
+      .setResults(deleteStateResult))
+
+    val response = 
verifyNoThrottling[DeleteShareGroupStateResponse](requestChannelRequest)
+    if (verifyNoErr) {
+      val expectedDeleteShareGroupStateResponseData = new 
DeleteShareGroupStateResponseData()
+        .setResults(deleteStateResult)
+      assertEquals(expectedDeleteShareGroupStateResponseData, response.data)
+    }
+    response
+  }
 }
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
index 72427ac8705..61e96e37f07 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.coordinator.share;
 
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
 import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -86,6 +88,14 @@ public interface ShareCoordinator {
      */
     CompletableFuture<ReadShareGroupStateSummaryResponseData> 
readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData 
request);
 
+    /**
+     * Handle delete share group state call
+     * @param context - represents the incoming delete share group request 
context
+     * @param request - actual RPC request object
+     * @return completable future representing delete share group RPC response 
data
+     */
+    CompletableFuture<DeleteShareGroupStateResponseData> 
deleteState(RequestContext context, DeleteShareGroupStateRequestData request);
+
     /**
      * Called when new coordinator is elected
      * @param partitionIndex - The partition index (internal topic)
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
index e1e9e9a11f9..17c869c4499 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
@@ -71,4 +71,14 @@ public class ShareCoordinatorRecordHelpers {
             )
         );
     }
+
+    public static CoordinatorRecord newShareStateTombstoneRecord(String 
groupId, Uuid topicId, int partitionId) {
+        // Always generate share snapshot type record for tombstone.
+        return CoordinatorRecord.tombstone(
+            new ShareSnapshotKey()
+                .setGroupId(groupId)
+                .setTopicId(topicId)
+                .setPartition(partitionId)
+        );
+    }
 }
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 05fcffb43c6..3139e2ef07d 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
 import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -29,6 +31,7 @@ import 
org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
 import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
 import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
 import org.apache.kafka.common.requests.RequestContext;
@@ -694,6 +697,116 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         });
     }
 
+    @Override
+    public CompletableFuture<DeleteShareGroupStateResponseData> 
deleteState(RequestContext context, DeleteShareGroupStateRequestData request) {
+        // Send an empty response if the coordinator is not active.
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(
+                generateErrorDeleteStateResponse(
+                    request,
+                    Errors.COORDINATOR_NOT_AVAILABLE,
+                    "Share coordinator is not available."
+                )
+            );
+        }
+
+        String groupId = request.groupId();
+        // Send an empty response if groupId is invalid.
+        if (isGroupIdEmpty(groupId)) {
+            log.error("Group id must be specified and non-empty: {}", request);
+            return CompletableFuture.completedFuture(
+                new DeleteShareGroupStateResponseData()
+            );
+        }
+
+        // Send an empty response if topic data is empty.
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                new DeleteShareGroupStateResponseData()
+            );
+        }
+
+        // Send an empty response if partition data is empty for any topic.
+        for (DeleteShareGroupStateRequestData.DeleteStateData topicData : 
request.topics()) {
+            if (isEmpty(topicData.partitions())) {
+                log.error("Partition Data for topic {} is empty: {}", 
topicData.topicId(), request);
+                return CompletableFuture.completedFuture(
+                    new DeleteShareGroupStateResponseData()
+                );
+            }
+        }
+
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<DeleteShareGroupStateResponseData>>> futureMap = new 
HashMap<>();
+
+        // The request received here could have multiple keys of structure 
group:topic:partition. However,
+        // the deleteState method in ShareCoordinatorShard expects a single 
key in the request. Hence, we will
+        // be looping over the keys below and constructing new 
DeleteShareGroupStateRequestData objects to pass
+        // onto the shard method.
+
+        for (DeleteShareGroupStateRequestData.DeleteStateData topicData : 
request.topics()) {
+            Uuid topicId = topicData.topicId();
+            for (DeleteShareGroupStateRequestData.PartitionData partitionData 
: topicData.partitions()) {
+                SharePartitionKey coordinatorKey = 
SharePartitionKey.getInstance(request.groupId(), topicId, 
partitionData.partition());
+
+                DeleteShareGroupStateRequestData requestForCurrentPartition = 
new DeleteShareGroupStateRequestData()
+                    .setGroupId(groupId)
+                    .setTopics(List.of(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(partitionData))));
+
+                CompletableFuture<DeleteShareGroupStateResponseData> 
deleteFuture = runtime.scheduleWriteOperation(
+                    "delete-share-group-state",
+                    topicPartitionFor(coordinatorKey),
+                    Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+                    coordinator -> 
coordinator.deleteState(requestForCurrentPartition)
+                ).exceptionally(deleteException ->
+                    handleOperationException(
+                        "delete-share-group-state",
+                        request,
+                        deleteException,
+                        (error, message) -> 
DeleteShareGroupStateResponse.toErrorResponseData(
+                            topicData.topicId(),
+                            partitionData.partition(),
+                            error,
+                            "Unable to delete share group state: " + 
deleteException.getMessage()
+                        ),
+                        log
+                    ));
+
+                futureMap.computeIfAbsent(topicId, k -> new HashMap<>())
+                    .put(partitionData.partition(), deleteFuture);
+            }
+        }
+
+        // Combine all futures into a single CompletableFuture<Void>.
+        CompletableFuture<Void> combinedFuture = 
CompletableFuture.allOf(futureMap.values().stream()
+            .flatMap(map -> 
map.values().stream()).toArray(CompletableFuture[]::new));
+
+        // Transform the combined CompletableFuture<Void> into 
CompletableFuture<DeleteShareGroupStateResponseData>.
+        return combinedFuture.thenApply(v -> {
+            List<DeleteShareGroupStateResponseData.DeleteStateResult> 
deleteStateResult = new ArrayList<>(futureMap.size());
+            futureMap.forEach(
+                (topicId, topicEntry) -> {
+                    List<DeleteShareGroupStateResponseData.PartitionResult> 
partitionResults = new ArrayList<>(topicEntry.size());
+                    topicEntry.forEach(
+                        (partitionId, responseFuture) -> {
+                            // ResponseFut would already be completed by now 
since we have used
+                            // CompletableFuture::allOf to create a combined 
future from the future map.
+                            partitionResults.add(
+                                
responseFuture.getNow(null).results().get(0).partitions().get(0)
+                            );
+                        }
+                    );
+                    
deleteStateResult.add(DeleteShareGroupStateResponse.toResponseDeleteStateResult(topicId,
 partitionResults));
+                }
+            );
+            return new DeleteShareGroupStateResponseData()
+                .setResults(deleteStateResult);
+        });
+    }
+
     private ReadShareGroupStateResponseData generateErrorReadStateResponse(
         ReadShareGroupStateRequestData request,
         Errors error,
@@ -746,6 +859,23 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
                 }).collect(Collectors.toList()));
     }
 
+    private DeleteShareGroupStateResponseData generateErrorDeleteStateResponse(
+        DeleteShareGroupStateRequestData request,
+        Errors error,
+        String errorMessage
+    ) {
+        return new 
DeleteShareGroupStateResponseData().setResults(request.topics().stream()
+            .map(topicData -> {
+                DeleteShareGroupStateResponseData.DeleteStateResult resultData 
= new DeleteShareGroupStateResponseData.DeleteStateResult();
+                resultData.setTopicId(topicData.topicId());
+                resultData.setPartitions(topicData.partitions().stream()
+                    .map(partitionData -> 
DeleteShareGroupStateResponse.toErrorResponsePartitionResult(
+                        partitionData.partition(), error, errorMessage
+                    )).collect(Collectors.toList()));
+                return resultData;
+            }).collect(Collectors.toList()));
+    }
+
     private static boolean isGroupIdEmpty(String groupId) {
         return groupId == null || groupId.isEmpty();
     }
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 53a64174f38..a71a911907a 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -20,6 +20,8 @@ package org.apache.kafka.coordinator.share;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
 import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -28,6 +30,7 @@ import 
org.apache.kafka.common.message.WriteShareGroupStateRequestData;
 import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
 import org.apache.kafka.common.requests.TransactionResult;
@@ -228,16 +231,25 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
 
     private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue 
value, long offset) {
         SharePartitionKey mapKey = 
SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
-        maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
-        maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
-
-        ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
-        // this record is the complete snapshot
-        shareStateMap.put(mapKey, offsetRecord);
-        // if number of share updates is exceeded, then reset it
-        if (snapshotUpdateCount.containsKey(mapKey)) {
-            if (snapshotUpdateCount.get(mapKey) >= 
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
-                snapshotUpdateCount.put(mapKey, 0);
+        if (value == null) {
+            log.debug("Tombstone records received for share partition key: 
{}", mapKey);
+            // Consider this a tombstone.
+            shareStateMap.remove(mapKey);
+            leaderEpochMap.remove(mapKey);
+            stateEpochMap.remove(mapKey);
+            snapshotUpdateCount.remove(mapKey);
+        } else {
+            maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
+            maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
+
+            ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
+            // This record is the complete snapshot.
+            shareStateMap.put(mapKey, offsetRecord);
+            // If number of share updates is exceeded, then reset it.
+            if (snapshotUpdateCount.containsKey(mapKey)) {
+                if (snapshotUpdateCount.get(mapKey) >= 
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
+                    snapshotUpdateCount.put(mapKey, 0);
+                }
             }
         }
 
@@ -475,6 +487,49 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         );
     }
 
+    /**
+     * This method writes tombstone records corresponding to the requested 
topic partitions.
+     * <p>
+     * This method as called by the ShareCoordinatorService will be provided 
with
+     * the request data which covers only key i.e. group1:topic1:partition1. 
The implementation
+     * below was done keeping this in mind.
+     *
+     * @param request - ReadShareGroupStateSummaryRequestData for a single key
+     * @return CoordinatorResult(records, response)
+     */
+
+    public CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> deleteState(
+        DeleteShareGroupStateRequestData request
+    ) {
+        // Records to write (with both key and value of snapshot type), 
response to caller
+        // only one key will be there in the request by design.
+        Optional<CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord>> error = maybeGetDeleteStateError(request);
+        if (error.isPresent()) {
+            return error.get();
+        }
+
+        DeleteShareGroupStateRequestData.DeleteStateData topicData = 
request.topics().get(0);
+        DeleteShareGroupStateRequestData.PartitionData partitionData = 
topicData.partitions().get(0);
+        SharePartitionKey key = 
SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), 
partitionData.partition());
+
+        CoordinatorRecord record = generateTombstoneRecord(key);
+        // build successful response if record is correctly created
+        DeleteShareGroupStateResponseData responseData = new 
DeleteShareGroupStateResponseData()
+            .setResults(
+                List.of(
+                    
DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(),
+                        List.of(
+                            
DeleteShareGroupStateResponse.toResponsePartitionResult(
+                                key.partition()
+                            )
+                        )
+                    )
+                )
+            );
+
+        return new CoordinatorResult<>(Collections.singletonList(record), 
responseData);
+    }
+
     /**
      * Util method to generate a ShareSnapshot or ShareUpdate type record for 
a key, based on various conditions.
      * <p>
@@ -537,6 +592,14 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         }
     }
 
+    private CoordinatorRecord generateTombstoneRecord(SharePartitionKey key) {
+        return ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+            key.groupId(),
+            key.topicId(),
+            key.partition()
+        );
+    }
+
     private List<PersisterStateBatch> mergeBatches(
         List<PersisterStateBatch> soFar,
         WriteShareGroupStateRequestData.PartitionData partitionData) {
@@ -670,6 +733,37 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         return Optional.empty();
     }
 
+    private Optional<CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord>> maybeGetDeleteStateError(
+        DeleteShareGroupStateRequestData request
+    ) {
+        DeleteShareGroupStateRequestData.DeleteStateData topicData = 
request.topics().get(0);
+        DeleteShareGroupStateRequestData.PartitionData partitionData = 
topicData.partitions().get(0);
+
+        Uuid topicId = topicData.topicId();
+        int partitionId = partitionData.partition();
+
+        if (topicId == null) {
+            return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST, 
NULL_TOPIC_ID, null, partitionId));
+        }
+
+        if (partitionId < 0) {
+            return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST, 
NEGATIVE_PARTITION_ID, topicId, partitionId));
+        }
+
+        if (metadataImage == null) {
+            log.error("Metadata image is null");
+            return 
Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, 
topicId, partitionId));
+        }
+
+        if (metadataImage.topics().getTopic(topicId) == null ||
+            metadataImage.topics().getPartition(topicId, partitionId) == null) 
{
+            log.error("Topic/TopicPartition not found in metadata image.");
+            return 
Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, 
topicId, partitionId));
+        }
+
+        return Optional.empty();
+    }
+
     private CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord> getWriteErrorResponse(
         Errors error,
         Exception exception,
@@ -681,6 +775,17 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         return new CoordinatorResult<>(Collections.emptyList(), responseData);
     }
 
+    private CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> getDeleteErrorResponse(
+        Errors error,
+        Exception exception,
+        Uuid topicId,
+        int partitionId
+    ) {
+        String message = exception == null ? error.message() : 
exception.getMessage();
+        DeleteShareGroupStateResponseData responseData = 
DeleteShareGroupStateResponse.toErrorResponseData(topicId, partitionId, error, 
message);
+        return new CoordinatorResult<>(Collections.emptyList(), responseData);
+    }
+
     // Visible for testing
     Integer getLeaderMapValue(SharePartitionKey key) {
         return this.leaderEpochMap.get(key);
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
index 37d9f9b5127..9de59e499d1 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
@@ -112,4 +112,25 @@ public class ShareCoordinatorRecordHelpersTest {
 
         assertEquals(expectedRecord, record);
     }
+
+    @Test
+    public void testNewShareStateTombstoneRecord() {
+        String groupId = "test-group";
+        Uuid topicId = Uuid.randomUuid();
+        int partitionId = 1;
+        CoordinatorRecord record = 
ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+            groupId,
+            topicId,
+            partitionId
+        );
+
+        CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+            new ShareSnapshotKey()
+                .setGroupId(groupId)
+                .setTopicId(topicId)
+                .setPartition(partitionId)
+        );
+
+        assertEquals(expectedRecord, record);
+    }
 }
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 7f7776cd93e..331595119b6 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
 import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -49,7 +51,6 @@ import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -79,7 +80,7 @@ class ShareCoordinatorServiceTest {
     private CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> 
mockRuntime() {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mock(CoordinatorRuntime.class);
         when(runtime.activeTopicPartitions())
-            .thenReturn(Collections.singletonList(new 
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)));
+            .thenReturn(List.of(new 
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)));
         return runtime;
     }
 
@@ -133,13 +134,13 @@ class ShareCoordinatorServiceTest {
             .setTopics(Arrays.asList(
                     new WriteShareGroupStateRequestData.WriteStateData()
                         .setTopicId(topicId1)
-                        .setPartitions(Collections.singletonList(
+                        .setPartitions(List.of(
                             new WriteShareGroupStateRequestData.PartitionData()
                                 .setPartition(partition1)
                                 .setStartOffset(0)
                                 .setStateEpoch(1)
                                 .setLeaderEpoch(1)
-                                .setStateBatches(Collections.singletonList(new 
WriteShareGroupStateRequestData.StateBatch()
+                                .setStateBatches(List.of(new 
WriteShareGroupStateRequestData.StateBatch()
                                     .setFirstOffset(0)
                                     .setLastOffset(10)
                                     .setDeliveryCount((short) 1)
@@ -148,13 +149,13 @@ class ShareCoordinatorServiceTest {
                         )),
                     new WriteShareGroupStateRequestData.WriteStateData()
                         .setTopicId(topicId2)
-                        .setPartitions(Collections.singletonList(
+                        .setPartitions(List.of(
                             new WriteShareGroupStateRequestData.PartitionData()
                                 .setPartition(partition2)
                                 .setStartOffset(0)
                                 .setStateEpoch(1)
                                 .setLeaderEpoch(1)
-                                .setStateBatches(Collections.singletonList(new 
WriteShareGroupStateRequestData.StateBatch()
+                                .setStateBatches(List.of(new 
WriteShareGroupStateRequestData.StateBatch()
                                     .setFirstOffset(0)
                                     .setLastOffset(10)
                                     .setDeliveryCount((short) 1)
@@ -165,18 +166,18 @@ class ShareCoordinatorServiceTest {
             );
 
         WriteShareGroupStateResponseData response1 = new 
WriteShareGroupStateResponseData()
-            .setResults(Collections.singletonList(
+            .setResults(List.of(
                 new WriteShareGroupStateResponseData.WriteStateResult()
                     .setTopicId(topicId1)
-                    .setPartitions(Collections.singletonList(new 
WriteShareGroupStateResponseData.PartitionResult()
+                    .setPartitions(List.of(new 
WriteShareGroupStateResponseData.PartitionResult()
                         .setPartition(partition1)))
             ));
 
         WriteShareGroupStateResponseData response2 = new 
WriteShareGroupStateResponseData()
-            .setResults(Collections.singletonList(
+            .setResults(List.of(
                 new WriteShareGroupStateResponseData.WriteStateResult()
                     .setTopicId(topicId2)
-                    .setPartitions(Collections.singletonList(new 
WriteShareGroupStateResponseData.PartitionResult()
+                    .setPartitions(List.of(new 
WriteShareGroupStateResponseData.PartitionResult()
                         .setPartition(partition2)))
             ));
 
@@ -199,11 +200,11 @@ class ShareCoordinatorServiceTest {
         HashSet<WriteShareGroupStateResponseData.WriteStateResult> 
expectedResult = new HashSet<>(Arrays.asList(
             new WriteShareGroupStateResponseData.WriteStateResult()
                 .setTopicId(topicId2)
-                .setPartitions(Collections.singletonList(new 
WriteShareGroupStateResponseData.PartitionResult()
+                .setPartitions(List.of(new 
WriteShareGroupStateResponseData.PartitionResult()
                     .setPartition(partition2))),
             new WriteShareGroupStateResponseData.WriteStateResult()
                 .setTopicId(topicId1)
-                .setPartitions(Collections.singletonList(new 
WriteShareGroupStateResponseData.PartitionResult()
+                .setPartitions(List.of(new 
WriteShareGroupStateResponseData.PartitionResult()
                     .setPartition(partition1)))));
         assertEquals(expectedResult, result);
         verify(time, times(2)).hiResClockMs();
@@ -243,14 +244,14 @@ class ShareCoordinatorServiceTest {
             .setTopics(Arrays.asList(
                     new ReadShareGroupStateRequestData.ReadStateData()
                         .setTopicId(topicId1)
-                        .setPartitions(Collections.singletonList(
+                        .setPartitions(List.of(
                             new ReadShareGroupStateRequestData.PartitionData()
                                 .setPartition(partition1)
                                 .setLeaderEpoch(1)
                         )),
                     new ReadShareGroupStateRequestData.ReadStateData()
                         .setTopicId(topicId2)
-                        .setPartitions(Collections.singletonList(
+                        .setPartitions(List.of(
                             new ReadShareGroupStateRequestData.PartitionData()
                                 .setPartition(partition2)
                                 .setLeaderEpoch(1)
@@ -260,12 +261,12 @@ class ShareCoordinatorServiceTest {
 
         ReadShareGroupStateResponseData.ReadStateResult topicData1 = new 
ReadShareGroupStateResponseData.ReadStateResult()
             .setTopicId(topicId1)
-            .setPartitions(Collections.singletonList(new 
ReadShareGroupStateResponseData.PartitionResult()
+            .setPartitions(List.of(new 
ReadShareGroupStateResponseData.PartitionResult()
                 .setPartition(partition1)
                 .setErrorCode(Errors.NONE.code())
                 .setStateEpoch(1)
                 .setStartOffset(0)
-                .setStateBatches(Collections.singletonList(new 
ReadShareGroupStateResponseData.StateBatch()
+                .setStateBatches(List.of(new 
ReadShareGroupStateResponseData.StateBatch()
                     .setFirstOffset(0)
                     .setLastOffset(10)
                     .setDeliveryCount((short) 1)
@@ -275,7 +276,7 @@ class ShareCoordinatorServiceTest {
 
         ReadShareGroupStateResponseData.ReadStateResult topicData2 = new 
ReadShareGroupStateResponseData.ReadStateResult()
             .setTopicId(topicId2)
-            .setPartitions(Collections.singletonList(new 
ReadShareGroupStateResponseData.PartitionResult()
+            .setPartitions(List.of(new 
ReadShareGroupStateResponseData.PartitionResult()
                 .setPartition(partition2)
                 .setErrorCode(Errors.NONE.code())
                 .setStateEpoch(1)
@@ -301,9 +302,9 @@ class ShareCoordinatorServiceTest {
             any()
         ))
             .thenReturn(CompletableFuture.completedFuture(new 
ReadShareGroupStateResponseData()
-                .setResults(Collections.singletonList(topicData1))))
+                .setResults(List.of(topicData1))))
             .thenReturn(CompletableFuture.completedFuture(new 
ReadShareGroupStateResponseData()
-                .setResults(Collections.singletonList(topicData2))));
+                .setResults(List.of(topicData2))));
 
         CompletableFuture<ReadShareGroupStateResponseData> future = 
service.readState(
             requestContext(ApiKeys.READ_SHARE_GROUP_STATE),
@@ -345,14 +346,14 @@ class ShareCoordinatorServiceTest {
             .setTopics(Arrays.asList(
                     new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
                         .setTopicId(topicId1)
-                        .setPartitions(Collections.singletonList(
+                        .setPartitions(List.of(
                             new 
ReadShareGroupStateSummaryRequestData.PartitionData()
                                 .setPartition(partition1)
                                 .setLeaderEpoch(1)
                         )),
                     new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
                         .setTopicId(topicId2)
-                        .setPartitions(Collections.singletonList(
+                        .setPartitions(List.of(
                             new 
ReadShareGroupStateSummaryRequestData.PartitionData()
                                 .setPartition(partition2)
                                 .setLeaderEpoch(1)
@@ -362,7 +363,7 @@ class ShareCoordinatorServiceTest {
 
         ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult 
topicData1 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
             .setTopicId(topicId1)
-            .setPartitions(Collections.singletonList(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+            .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
                 .setPartition(partition1)
                 .setErrorCode(Errors.NONE.code())
                 .setStateEpoch(1)
@@ -371,7 +372,7 @@ class ShareCoordinatorServiceTest {
 
         ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult 
topicData2 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
             .setTopicId(topicId2)
-            .setPartitions(Collections.singletonList(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+            .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
                 .setPartition(partition2)
                 .setErrorCode(Errors.NONE.code())
                 .setStateEpoch(1)
@@ -385,9 +386,9 @@ class ShareCoordinatorServiceTest {
             any()
         ))
             .thenReturn(CompletableFuture.completedFuture(new 
ReadShareGroupStateSummaryResponseData()
-                .setResults(Collections.singletonList(topicData1))))
+                .setResults(List.of(topicData1))))
             .thenReturn(CompletableFuture.completedFuture(new 
ReadShareGroupStateSummaryResponseData()
-                .setResults(Collections.singletonList(topicData2))));
+                .setResults(List.of(topicData2))));
 
         CompletableFuture<ReadShareGroupStateSummaryResponseData> future = 
service.readStateSummary(
             requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
@@ -402,6 +403,93 @@ class ShareCoordinatorServiceTest {
         assertEquals(expectedResult, result);
     }
 
+    @Test
+    public void testDeleteStateSuccess() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Metrics metrics = new Metrics();
+        ShareCoordinatorMetrics coordinatorMetrics = new 
ShareCoordinatorMetrics(metrics);
+        Time time = mock(Time.class);
+        ShareCoordinatorService service = new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            coordinatorMetrics,
+            time,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
+        );
+
+        service.startup(() -> 1);
+
+        String groupId = "group1";
+        Uuid topicId1 = Uuid.randomUuid();
+        int partition1 = 0;
+
+        Uuid topicId2 = Uuid.randomUuid();
+        int partition2 = 1;
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(groupId)
+            .setTopics(Arrays.asList(
+                    new DeleteShareGroupStateRequestData.DeleteStateData()
+                        .setTopicId(topicId1)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateRequestData.PartitionData()
+                                .setPartition(partition1)
+                        )),
+                    new DeleteShareGroupStateRequestData.DeleteStateData()
+                        .setTopicId(topicId2)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateRequestData.PartitionData()
+                                .setPartition(partition2)
+                        ))
+                )
+            );
+
+        DeleteShareGroupStateResponseData response1 = new 
DeleteShareGroupStateResponseData()
+            .setResults(List.of(
+                new DeleteShareGroupStateResponseData.DeleteStateResult()
+                    .setTopicId(topicId1)
+                    .setPartitions(List.of(new 
DeleteShareGroupStateResponseData.PartitionResult()
+                        .setPartition(partition1)))
+            ));
+
+        DeleteShareGroupStateResponseData response2 = new 
DeleteShareGroupStateResponseData()
+            .setResults(List.of(
+                new DeleteShareGroupStateResponseData.DeleteStateResult()
+                    .setTopicId(topicId2)
+                    .setPartitions(List.of(new 
DeleteShareGroupStateResponseData.PartitionResult()
+                        .setPartition(partition2)))
+            ));
+
+        when(runtime.scheduleWriteOperation(
+            eq("delete-share-group-state"),
+            eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
+            eq(Duration.ofMillis(5000)),
+            any()
+        ))
+            .thenReturn(CompletableFuture.completedFuture(response1))
+            .thenReturn(CompletableFuture.completedFuture(response2));
+
+        CompletableFuture<DeleteShareGroupStateResponseData> future = 
service.deleteState(
+            requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
+            request
+        );
+
+        HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> result = 
new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
+
+        HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> 
expectedResult = new HashSet<>(Arrays.asList(
+            new DeleteShareGroupStateResponseData.DeleteStateResult()
+                .setTopicId(topicId2)
+                .setPartitions(List.of(new 
DeleteShareGroupStateResponseData.PartitionResult()
+                    .setPartition(partition2))),
+            new DeleteShareGroupStateResponseData.DeleteStateResult()
+                .setTopicId(topicId1)
+                .setPartitions(List.of(new 
DeleteShareGroupStateResponseData.PartitionResult()
+                    .setPartition(partition1)))));
+        assertEquals(expectedResult, result);
+    }
+
     @Test
     public void testWriteStateValidationsError() throws ExecutionException, 
InterruptedException, TimeoutException {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
@@ -433,7 +521,7 @@ class ShareCoordinatorServiceTest {
         assertEquals(new WriteShareGroupStateResponseData(),
             service.writeState(
                 requestContext(ApiKeys.WRITE_SHARE_GROUP_STATE),
-                new 
WriteShareGroupStateRequestData().setGroupId(groupId).setTopics(Collections.singletonList(
+                new 
WriteShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(
                     new 
WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId)))
             ).get(5, TimeUnit.SECONDS)
         );
@@ -442,8 +530,8 @@ class ShareCoordinatorServiceTest {
         assertEquals(new WriteShareGroupStateResponseData(),
             service.writeState(
                 requestContext(ApiKeys.WRITE_SHARE_GROUP_STATE),
-                new 
WriteShareGroupStateRequestData().setGroupId(null).setTopics(Collections.singletonList(
-                    new 
WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId).setPartitions(Collections.singletonList(
+                new 
WriteShareGroupStateRequestData().setGroupId(null).setTopics(List.of(
+                    new 
WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId).setPartitions(List.of(
                         new 
WriteShareGroupStateRequestData.PartitionData().setPartition(partition)))))
             ).get(5, TimeUnit.SECONDS)
         );
@@ -480,7 +568,7 @@ class ShareCoordinatorServiceTest {
         assertEquals(new ReadShareGroupStateResponseData(),
             service.readState(
                 requestContext(ApiKeys.READ_SHARE_GROUP_STATE),
-                new 
ReadShareGroupStateRequestData().setGroupId(groupId).setTopics(Collections.singletonList(
+                new 
ReadShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(
                     new 
ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId)))
             ).get(5, TimeUnit.SECONDS)
         );
@@ -489,8 +577,8 @@ class ShareCoordinatorServiceTest {
         assertEquals(new ReadShareGroupStateResponseData(),
             service.readState(
                 requestContext(ApiKeys.READ_SHARE_GROUP_STATE),
-                new 
ReadShareGroupStateRequestData().setGroupId(null).setTopics(Collections.singletonList(
-                    new 
ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId).setPartitions(Collections.singletonList(
+                new 
ReadShareGroupStateRequestData().setGroupId(null).setTopics(List.of(
+                    new 
ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId).setPartitions(List.of(
                         new 
ReadShareGroupStateRequestData.PartitionData().setPartition(partition)))))
             ).get(5, TimeUnit.SECONDS)
         );
@@ -527,7 +615,7 @@ class ShareCoordinatorServiceTest {
         assertEquals(new ReadShareGroupStateSummaryResponseData(),
             service.readStateSummary(
                 requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
-                new 
ReadShareGroupStateSummaryRequestData().setGroupId(groupId).setTopics(Collections.singletonList(
+                new 
ReadShareGroupStateSummaryRequestData().setGroupId(groupId).setTopics(List.of(
                     new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId)))
             ).get(5, TimeUnit.SECONDS)
         );
@@ -536,13 +624,60 @@ class ShareCoordinatorServiceTest {
         assertEquals(new ReadShareGroupStateSummaryResponseData(),
             service.readStateSummary(
                 requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
-                new 
ReadShareGroupStateSummaryRequestData().setGroupId(null).setTopics(Collections.singletonList(
-                    new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId).setPartitions(Collections.singletonList(
+                new 
ReadShareGroupStateSummaryRequestData().setGroupId(null).setTopics(List.of(
+                    new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId).setPartitions(List.of(
                         new 
ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partition)))))
             ).get(5, TimeUnit.SECONDS)
         );
     }
 
+    @Test
+    public void testDeleteStateValidationsError() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        ShareCoordinatorService service = new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
+        );
+
+        service.startup(() -> 1);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 0;
+
+        // 1. Empty topicsData
+        assertEquals(new DeleteShareGroupStateResponseData(),
+            service.deleteState(
+                requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
+                new DeleteShareGroupStateRequestData().setGroupId(groupId)
+            ).get(5, TimeUnit.SECONDS)
+        );
+
+        // 2. Empty partitionsData
+        assertEquals(new DeleteShareGroupStateResponseData(),
+            service.deleteState(
+                requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
+                new 
DeleteShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(
+                    new 
DeleteShareGroupStateRequestData.DeleteStateData().setTopicId(topicId)))
+            ).get(5, TimeUnit.SECONDS)
+        );
+
+        // 3. Invalid groupId
+        assertEquals(new DeleteShareGroupStateResponseData(),
+            service.deleteState(
+                requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
+                new 
DeleteShareGroupStateRequestData().setGroupId(null).setTopics(List.of(
+                    new 
DeleteShareGroupStateRequestData.DeleteStateData().setTopicId(topicId).setPartitions(List.of(
+                        new 
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition)))))
+            ).get(5, TimeUnit.SECONDS)
+        );
+    }
+
     @Test
     public void testWriteStateWhenNotStarted() throws ExecutionException, 
InterruptedException, TimeoutException {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
@@ -568,13 +703,13 @@ class ShareCoordinatorServiceTest {
             .setTopics(Arrays.asList(
                     new WriteShareGroupStateRequestData.WriteStateData()
                         .setTopicId(topicId1)
-                        .setPartitions(Collections.singletonList(
+                        .setPartitions(List.of(
                             new WriteShareGroupStateRequestData.PartitionData()
                                 .setPartition(partition1)
                                 .setStartOffset(0)
                                 .setStateEpoch(1)
                                 .setLeaderEpoch(1)
-                                .setStateBatches(Collections.singletonList(new 
WriteShareGroupStateRequestData.StateBatch()
+                                .setStateBatches(List.of(new 
WriteShareGroupStateRequestData.StateBatch()
                                     .setFirstOffset(0)
                                     .setLastOffset(10)
                                     .setDeliveryCount((short) 1)
@@ -583,13 +718,13 @@ class ShareCoordinatorServiceTest {
                         )),
                     new WriteShareGroupStateRequestData.WriteStateData()
                         .setTopicId(topicId2)
-                        .setPartitions(Collections.singletonList(
+                        .setPartitions(List.of(
                             new WriteShareGroupStateRequestData.PartitionData()
                                 .setPartition(partition2)
                                 .setStartOffset(0)
                                 .setStateEpoch(1)
                                 .setLeaderEpoch(1)
-                                .setStateBatches(Collections.singletonList(new 
WriteShareGroupStateRequestData.StateBatch()
+                                .setStateBatches(List.of(new 
WriteShareGroupStateRequestData.StateBatch()
                                     .setFirstOffset(0)
                                     .setLastOffset(10)
                                     .setDeliveryCount((short) 1)
@@ -609,13 +744,13 @@ class ShareCoordinatorServiceTest {
         HashSet<WriteShareGroupStateResponseData.WriteStateResult> 
expectedResult = new HashSet<>(Arrays.asList(
             new WriteShareGroupStateResponseData.WriteStateResult()
                 .setTopicId(topicId2)
-                .setPartitions(Collections.singletonList(new 
WriteShareGroupStateResponseData.PartitionResult()
+                .setPartitions(List.of(new 
WriteShareGroupStateResponseData.PartitionResult()
                     .setPartition(partition2)
                     .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
                     .setErrorMessage("Share coordinator is not available."))),
             new WriteShareGroupStateResponseData.WriteStateResult()
                 .setTopicId(topicId1)
-                .setPartitions(Collections.singletonList(new 
WriteShareGroupStateResponseData.PartitionResult()
+                .setPartitions(List.of(new 
WriteShareGroupStateResponseData.PartitionResult()
                     .setPartition(partition1)
                     .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
                     .setErrorMessage("Share coordinator is not 
available.")))));
@@ -647,14 +782,14 @@ class ShareCoordinatorServiceTest {
             .setTopics(Arrays.asList(
                     new ReadShareGroupStateRequestData.ReadStateData()
                         .setTopicId(topicId1)
-                        .setPartitions(Collections.singletonList(
+                        .setPartitions(List.of(
                             new ReadShareGroupStateRequestData.PartitionData()
                                 .setPartition(partition1)
                                 .setLeaderEpoch(1)
                         )),
                     new ReadShareGroupStateRequestData.ReadStateData()
                         .setTopicId(topicId2)
-                        .setPartitions(Collections.singletonList(
+                        .setPartitions(List.of(
                             new ReadShareGroupStateRequestData.PartitionData()
                                 .setPartition(partition2)
                                 .setLeaderEpoch(1)
@@ -672,13 +807,13 @@ class ShareCoordinatorServiceTest {
         HashSet<ReadShareGroupStateResponseData.ReadStateResult> 
expectedResult = new HashSet<>(Arrays.asList(
             new ReadShareGroupStateResponseData.ReadStateResult()
                 .setTopicId(topicId2)
-                .setPartitions(Collections.singletonList(new 
ReadShareGroupStateResponseData.PartitionResult()
+                .setPartitions(List.of(new 
ReadShareGroupStateResponseData.PartitionResult()
                     .setPartition(partition2)
                     .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
                     .setErrorMessage("Share coordinator is not available."))),
             new ReadShareGroupStateResponseData.ReadStateResult()
                 .setTopicId(topicId1)
-                .setPartitions(Collections.singletonList(new 
ReadShareGroupStateResponseData.PartitionResult()
+                .setPartitions(List.of(new 
ReadShareGroupStateResponseData.PartitionResult()
                     .setPartition(partition1)
                     .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
                     .setErrorMessage("Share coordinator is not 
available.")))));
@@ -710,14 +845,14 @@ class ShareCoordinatorServiceTest {
             .setTopics(Arrays.asList(
                     new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
                         .setTopicId(topicId1)
-                        .setPartitions(Collections.singletonList(
+                        .setPartitions(List.of(
                             new 
ReadShareGroupStateSummaryRequestData.PartitionData()
                                 .setPartition(partition1)
                                 .setLeaderEpoch(1)
                         )),
                     new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
                         .setTopicId(topicId2)
-                        .setPartitions(Collections.singletonList(
+                        .setPartitions(List.of(
                             new 
ReadShareGroupStateSummaryRequestData.PartitionData()
                                 .setPartition(partition2)
                                 .setLeaderEpoch(1)
@@ -735,13 +870,74 @@ class ShareCoordinatorServiceTest {
         HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> 
expectedResult = new HashSet<>(Arrays.asList(
             new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
                 .setTopicId(topicId2)
-                .setPartitions(Collections.singletonList(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
                     .setPartition(partition2)
                     .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
                     .setErrorMessage("Share coordinator is not available."))),
             new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
                 .setTopicId(topicId1)
-                .setPartitions(Collections.singletonList(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                    .setPartition(partition1)
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                    .setErrorMessage("Share coordinator is not 
available.")))));
+        assertEquals(expectedResult, result);
+    }
+
+    @Test
+    public void testDeleteStateWhenNotStarted() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        ShareCoordinatorService service = new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
+        );
+
+        String groupId = "group1";
+        Uuid topicId1 = Uuid.randomUuid();
+        int partition1 = 0;
+
+        Uuid topicId2 = Uuid.randomUuid();
+        int partition2 = 1;
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(groupId)
+            .setTopics(Arrays.asList(
+                    new DeleteShareGroupStateRequestData.DeleteStateData()
+                        .setTopicId(topicId1)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateRequestData.PartitionData()
+                                .setPartition(partition1)
+                        )),
+                    new DeleteShareGroupStateRequestData.DeleteStateData()
+                        .setTopicId(topicId2)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateRequestData.PartitionData()
+                                .setPartition(partition2)
+                        ))
+                )
+            );
+
+        CompletableFuture<DeleteShareGroupStateResponseData> future = 
service.deleteState(
+            requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
+            request
+        );
+
+        HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> result = 
new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
+
+        HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> 
expectedResult = new HashSet<>(Arrays.asList(
+            new DeleteShareGroupStateResponseData.DeleteStateResult()
+                .setTopicId(topicId2)
+                .setPartitions(List.of(new 
DeleteShareGroupStateResponseData.PartitionResult()
+                    .setPartition(partition2)
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                    .setErrorMessage("Share coordinator is not available."))),
+            new DeleteShareGroupStateResponseData.DeleteStateResult()
+                .setTopicId(topicId1)
+                .setPartitions(List.of(new 
DeleteShareGroupStateResponseData.PartitionResult()
                     .setPartition(partition1)
                     .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
                     .setErrorMessage("Share coordinator is not 
available.")))));
@@ -771,23 +967,23 @@ class ShareCoordinatorServiceTest {
             
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
 
         assertEquals(new WriteShareGroupStateResponseData()
-                .setResults(Collections.singletonList(new 
WriteShareGroupStateResponseData.WriteStateResult()
+                .setResults(List.of(new 
WriteShareGroupStateResponseData.WriteStateResult()
                     .setTopicId(topicId)
-                    .setPartitions(Collections.singletonList(new 
WriteShareGroupStateResponseData.PartitionResult()
+                    .setPartitions(List.of(new 
WriteShareGroupStateResponseData.PartitionResult()
                         .setPartition(partition)
                         .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
                         .setErrorMessage("Unable to write share group state: 
This server does not host this topic-partition."))))),
             service.writeState(
                 requestContext(ApiKeys.WRITE_SHARE_GROUP_STATE),
                 new WriteShareGroupStateRequestData().setGroupId(groupId)
-                    .setTopics(Collections.singletonList(new 
WriteShareGroupStateRequestData.WriteStateData()
+                    .setTopics(List.of(new 
WriteShareGroupStateRequestData.WriteStateData()
                         .setTopicId(topicId)
-                        .setPartitions(Collections.singletonList(new 
WriteShareGroupStateRequestData.PartitionData()
+                        .setPartitions(List.of(new 
WriteShareGroupStateRequestData.PartitionData()
                             .setPartition(partition)
                             .setLeaderEpoch(1)
                             .setStartOffset(1)
                             .setStateEpoch(1)
-                            .setStateBatches(Collections.singletonList(new 
WriteShareGroupStateRequestData.StateBatch()
+                            .setStateBatches(List.of(new 
WriteShareGroupStateRequestData.StateBatch()
                                 .setFirstOffset(2)
                                 .setLastOffset(10)
                                 .setDeliveryCount((short) 1)
@@ -821,18 +1017,18 @@ class ShareCoordinatorServiceTest {
             
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
 
         assertEquals(new ReadShareGroupStateResponseData()
-                .setResults(Collections.singletonList(new 
ReadShareGroupStateResponseData.ReadStateResult()
+                .setResults(List.of(new 
ReadShareGroupStateResponseData.ReadStateResult()
                     .setTopicId(topicId)
-                    .setPartitions(Collections.singletonList(new 
ReadShareGroupStateResponseData.PartitionResult()
+                    .setPartitions(List.of(new 
ReadShareGroupStateResponseData.PartitionResult()
                         .setPartition(partition)
                         .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
                         .setErrorMessage("Unable to read share group state: 
The server experienced an unexpected error when processing the request."))))),
             service.readState(
                 requestContext(ApiKeys.READ_SHARE_GROUP_STATE),
                 new ReadShareGroupStateRequestData().setGroupId(groupId)
-                    .setTopics(Collections.singletonList(new 
ReadShareGroupStateRequestData.ReadStateData()
+                    .setTopics(List.of(new 
ReadShareGroupStateRequestData.ReadStateData()
                         .setTopicId(topicId)
-                        .setPartitions(Collections.singletonList(new 
ReadShareGroupStateRequestData.PartitionData()
+                        .setPartitions(List.of(new 
ReadShareGroupStateRequestData.PartitionData()
                             .setPartition(partition)
                             .setLeaderEpoch(1)
                         ))
@@ -864,18 +1060,18 @@ class ShareCoordinatorServiceTest {
             
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
 
         assertEquals(new ReadShareGroupStateSummaryResponseData()
-                .setResults(Collections.singletonList(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                .setResults(List.of(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
                     .setTopicId(topicId)
-                    .setPartitions(Collections.singletonList(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                    .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
                         .setPartition(partition)
                         .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
                         .setErrorMessage("Unable to read share group state 
summary: The server experienced an unexpected error when processing the 
request."))))),
             service.readStateSummary(
                 requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
                 new ReadShareGroupStateSummaryRequestData().setGroupId(groupId)
-                    .setTopics(Collections.singletonList(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                    .setTopics(List.of(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
                         .setTopicId(topicId)
-                        .setPartitions(Collections.singletonList(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
+                        .setPartitions(List.of(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
                             .setPartition(partition)
                             .setLeaderEpoch(1)
                         ))
@@ -884,6 +1080,48 @@ class ShareCoordinatorServiceTest {
         );
     }
 
+    @Test
+    public void testDeleteFutureReturnsError() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        ShareCoordinatorService service = new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            new ShareCoordinatorMetrics(),
+            Time.SYSTEM,
+            mock(Timer.class),
+            mock(PartitionWriter.class)
+        );
+
+        service.startup(() -> 1);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 0;
+
+        when(runtime.scheduleWriteOperation(any(), any(), any(), any()))
+            
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
+
+        assertEquals(new DeleteShareGroupStateResponseData()
+                .setResults(List.of(new 
DeleteShareGroupStateResponseData.DeleteStateResult()
+                    .setTopicId(topicId)
+                    .setPartitions(List.of(new 
DeleteShareGroupStateResponseData.PartitionResult()
+                        .setPartition(partition)
+                        .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                        .setErrorMessage("Unable to delete share group state: 
This server does not host this topic-partition."))))),
+            service.deleteState(
+                requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
+                new DeleteShareGroupStateRequestData().setGroupId(groupId)
+                    .setTopics(List.of(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(new 
DeleteShareGroupStateRequestData.PartitionData()
+                            .setPartition(partition)
+                        ))
+                    ))
+            ).get(5, TimeUnit.SECONDS)
+        );
+    }
+
     @Test
     public void testTopicPartitionFor() {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index fcd6933f1e3..7ebb5ce3954 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.coordinator.share;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
 import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -26,6 +28,7 @@ import 
org.apache.kafka.common.message.WriteShareGroupStateRequestData;
 import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
 import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
@@ -40,6 +43,7 @@ import 
org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
 import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.share.SharePartitionKey;
@@ -58,10 +62,12 @@ import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -957,6 +963,216 @@ class ShareCoordinatorShardTest {
         
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
     }
 
+    @Test
+    public void testDeleteStateSuccess() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+
+        SharePartitionKey shareCoordinatorKey = 
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(Collections.singletonList(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(Collections.singletonList(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)))));
+
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+
+        // apply a record in to verify delete
+        CoordinatorRecord record = 
ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+            GROUP_ID,
+            TOPIC_ID,
+            PARTITION,
+            new ShareGroupOffset.Builder()
+                .setSnapshotEpoch(0)
+                .setStateEpoch(0)
+                .setLeaderEpoch(0)
+                .setStateBatches(List.of(
+                        new PersisterStateBatch(
+                            0,
+                            10,
+                            (byte) 0,
+                            (short) 1
+                        )
+                    )
+                )
+                .build()
+        );
+        shard.replay(0L, 0L, (short) 0, record);
+        assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+
+        // apply tombstone
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+        List<CoordinatorRecord> expectedRecords = List.of(
+            ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+                GROUP_ID, TOPIC_ID, PARTITION)
+        );
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+    }
+
+    @Test
+    public void testDeleteStateFirstRecordDeleteSuccess() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+
+        SharePartitionKey shareCoordinatorKey = 
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(Collections.singletonList(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(Collections.singletonList(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)))));
+
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+        
+        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+
+        // apply tombstone
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+        List<CoordinatorRecord> expectedRecords = List.of(
+            ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+                GROUP_ID, TOPIC_ID, PARTITION)
+        );
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+    }
+
+    @Test
+    public void testDeleteStateInvalidRequestData() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+
+        // invalid partition
+        int partition = -1;
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(partition)))));
+
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+
+        DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toErrorResponseData(
+            TOPIC_ID, partition, Errors.INVALID_REQUEST, 
ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage());
+        List<CoordinatorRecord> expectedRecords = List.of();
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+        assertEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void testDeleteNullMetadataImage() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        shard.onNewMetadataImage(null, null);
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(0)))));
+
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+
+        DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toErrorResponseData(
+            TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+        List<CoordinatorRecord> expectedRecords = List.of();
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void testDeleteTopicIdNonExistentInMetadataImage() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        MetadataImage image = mock(MetadataImage.class);
+        shard.onNewMetadataImage(image, null);
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(0)))));
+
+        // topic id not found in cache
+        TopicsImage topicsImage = mock(TopicsImage.class);
+        when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(
+            null
+        );
+        when(image.topics()).thenReturn(
+            topicsImage
+        );
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+
+        DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toErrorResponseData(
+            TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+        List<CoordinatorRecord> expectedRecords = List.of();
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+        verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID));
+    }
+
+    @Test
+    public void testDeletePartitionIdNonExistentInMetadataImage() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        MetadataImage image = mock(MetadataImage.class);
+        shard.onNewMetadataImage(image, null);
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(0)))));
+
+        // topic id found in cache
+        TopicsImage topicsImage = mock(TopicsImage.class);
+        when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(
+            mock(TopicImage.class)
+        );
+        when(image.topics()).thenReturn(
+            topicsImage
+        );
+
+        // partition id not found
+        when(topicsImage.getPartition(eq(TOPIC_ID), eq(0))).thenReturn(
+            null
+        );
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+
+        DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toErrorResponseData(
+            TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+        List<CoordinatorRecord> expectedRecords = List.of();
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+        verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID));
+        verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0));
+    }
+
     private static ShareGroupOffset groupOffset(ApiMessage record) {
         if (record instanceof ShareSnapshotValue) {
             return ShareGroupOffset.fromRecord((ShareSnapshotValue) record);

Reply via email to