This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0bd1ff936f8 KAFKA-18629: Add persister impl and tests for 
DeleteShareGroupState RPC. [2/N] (#18748)
0bd1ff936f8 is described below

commit 0bd1ff936f81f21d4b7bf21563d4e8e7a2934527
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed Feb 5 20:21:19 2025 +0530

    KAFKA-18629: Add persister impl and tests for DeleteShareGroupState RPC. 
[2/N] (#18748)
    
    Reviewers: Andrew Schofield <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +
 .../requests/DeleteShareGroupStateRequest.java     |   2 +-
 .../share/persister/DefaultStatePersister.java     | 103 ++-
 .../share/persister/PersisterStateManager.java     | 165 +++++
 .../share/persister/DefaultStatePersisterTest.java | 311 +++++++++
 .../share/persister/PersisterStateManagerTest.java | 725 +++++++++++++++++++++
 6 files changed, 1306 insertions(+), 2 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 67c8a77022c..d4b357396f6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -347,6 +347,8 @@
               
files="(ShareCoordinatorServiceTest|DefaultStatePersisterTest|PersisterStateManagerTest).java"/>
     <suppress checks="CyclomaticComplexity"
               files="ShareCoordinatorShard.java"/>
+    <suppress checks="ClassFanOutComplexity"
+              
files="(PersisterStateManagerTest|DefaultStatePersisterTest).java"/>
 
     <!-- storage -->
     <suppress checks="CyclomaticComplexity"
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateRequest.java
index 074ace1cfea..d66785c682a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateRequest.java
@@ -34,7 +34,7 @@ public class DeleteShareGroupStateRequest extends 
AbstractRequest {
         private final DeleteShareGroupStateRequestData data;
 
         public Builder(DeleteShareGroupStateRequestData data) {
-            this(data, false);
+            this(data, true);
         }
 
         public Builder(DeleteShareGroupStateRequestData data, boolean 
enableUnstableLastVersion) {
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
index 3b6db31b657..fdd6c7cb5da 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
@@ -19,6 +19,7 @@ package org.apache.kafka.server.share.persister;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
 import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
@@ -280,7 +281,46 @@ public class DefaultStatePersister implements Persister {
      * @return A completable future of DeleteShareGroupStateResult
      */
     public CompletableFuture<DeleteShareGroupStateResult> 
deleteState(DeleteShareGroupStateParameters request) {
-        throw new RuntimeException("not implemented");
+        try {
+            validate(request);
+        } catch (Exception e) {
+            log.error("Unable to validate delete state request", e);
+            return CompletableFuture.failedFuture(e);
+        }
+        GroupTopicPartitionData<PartitionIdData> gtp = 
request.groupTopicPartitionData();
+        String groupId = gtp.groupId();
+
+        Map<Uuid, Map<Integer, 
CompletableFuture<DeleteShareGroupStateResponse>>> futureMap = new HashMap<>();
+        List<PersisterStateManager.DeleteStateHandler> handlers = new 
ArrayList<>();
+
+        gtp.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                CompletableFuture<DeleteShareGroupStateResponse> future = 
futureMap
+                    .computeIfAbsent(topicData.topicId(), k -> new HashMap<>())
+                    .computeIfAbsent(partitionData.partition(), k -> new 
CompletableFuture<>());
+
+                handlers.add(
+                    stateManager.new DeleteStateHandler(
+                        groupId,
+                        topicData.topicId(),
+                        partitionData.partition(),
+                        future,
+                        null
+                    )
+                );
+            });
+        });
+
+        for (PersisterStateManager.PersisterStateManagerHandler handler : 
handlers) {
+            stateManager.enqueue(handler);
+        }
+
+        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+            handlers.stream()
+                .map(PersisterStateManager.DeleteStateHandler::result)
+                .toArray(CompletableFuture[]::new));
+
+        return combinedFuture.thenApply(v -> 
deleteResponsesToResult(futureMap));
     }
 
     /**
@@ -384,6 +424,55 @@ public class DefaultStatePersister implements Persister {
             .build();
     }
 
+    /**
+     * Takes in a list of COMPLETED futures and combines the results,
+     * taking care of errors if any, into a single DeleteShareGroupStateResult
+     *
+     * @param futureMap - HashMap of {topic -> {partition -> future}}
+     * @return Object representing combined result of type 
DeleteShareGroupStateResult
+     */
+    // visible for testing
+    DeleteShareGroupStateResult deleteResponsesToResult(
+        Map<Uuid, Map<Integer, 
CompletableFuture<DeleteShareGroupStateResponse>>> futureMap
+    ) {
+        List<TopicData<PartitionErrorData>> topicsData = 
futureMap.keySet().stream()
+            .map(topicId -> {
+                List<PartitionErrorData> partitionErrorData = 
futureMap.get(topicId).entrySet().stream()
+                    .map(partitionFuture -> {
+                        int partition = partitionFuture.getKey();
+                        CompletableFuture<DeleteShareGroupStateResponse> 
future = partitionFuture.getValue();
+                        try {
+                            // already completed because of allOf call in the 
caller
+                            DeleteShareGroupStateResponse partitionResponse = 
future.join();
+                            return 
partitionResponse.data().results().get(0).partitions().stream()
+                                .map(partitionResult -> 
PartitionFactory.newPartitionErrorData(
+                                        partitionResult.partition(),
+                                        partitionResult.errorCode(),
+                                        partitionResult.errorMessage()
+                                    )
+                                )
+                                .toList();
+                        } catch (Exception e) {
+                            log.error("Unexpected exception while getting data 
from share coordinator", e);
+                            return List.of(
+                                PartitionFactory.newPartitionErrorData(
+                                    partition,
+                                    Errors.UNKNOWN_SERVER_ERROR.code(),   // 
No specific public error code exists for InterruptedException / 
ExecutionException
+                                    "Error deleting state from share 
coordinator: " + e.getMessage()
+                                )
+                            );
+                        }
+                    })
+                    .flatMap(List::stream)
+                    .collect(Collectors.toList());
+                return new TopicData<>(topicId, partitionErrorData);
+            })
+            .collect(Collectors.toList());
+        return new DeleteShareGroupStateResult.Builder()
+            .setTopicsData(topicsData)
+            .build();
+    }
+
     private static void validate(WriteShareGroupStateParameters params) {
         String prefix = "Write share group parameters";
         if (params == null) {
@@ -420,6 +509,18 @@ public class DefaultStatePersister implements Persister {
         validateGroupTopicPartitionData(prefix, 
params.groupTopicPartitionData());
     }
 
+    private static void validate(DeleteShareGroupStateParameters params) {
+        String prefix = "Delete share group parameters";
+        if (params == null) {
+            throw new IllegalArgumentException(prefix + " cannot be null.");
+        }
+        if (params.groupTopicPartitionData() == null) {
+            throw new IllegalArgumentException(prefix + " data cannot be 
null.");
+        }
+
+        validateGroupTopicPartitionData(prefix, 
params.groupTopicPartitionData());
+    }
+
     private static void validateGroupTopicPartitionData(String prefix, 
GroupTopicPartitionData<? extends PartitionIdData> data) {
         String groupId = data.groupId();
         if (groupId == null || groupId.isEmpty()) {
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 5c9028b87e8..21e00be29f8 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -24,6 +24,8 @@ import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
@@ -36,6 +38,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DeleteShareGroupStateRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
@@ -875,6 +879,144 @@ public class PersisterStateManager {
         }
     }
 
+    public class DeleteStateHandler extends PersisterStateManagerHandler {
+        private final CompletableFuture<DeleteShareGroupStateResponse> result;
+        private final BackoffManager deleteStateBackoff;
+
+        public DeleteStateHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            CompletableFuture<DeleteShareGroupStateResponse> result,
+            long backoffMs,
+            long backoffMaxMs,
+            int maxRPCRetryAttempts
+        ) {
+            super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
+            this.result = result;
+            this.deleteStateBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);
+        }
+
+        public DeleteStateHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            CompletableFuture<DeleteShareGroupStateResponse> result,
+            Consumer<ClientResponse> onCompleteCallback
+        ) {
+            this(
+                groupId,
+                topicId,
+                partition,
+                result,
+                REQUEST_BACKOFF_MS,
+                REQUEST_BACKOFF_MAX_MS,
+                MAX_FIND_COORD_ATTEMPTS
+            );
+        }
+
+        @Override
+        protected String name() {
+            return "DeleteStateHandler";
+        }
+
+        @Override
+        protected AbstractRequest.Builder<? extends AbstractRequest> 
requestBuilder() {
+            throw new RuntimeException("Delete requests are batchable, hence 
individual requests not needed.");
+        }
+
+        @Override
+        protected boolean isResponseForRequest(ClientResponse response) {
+            return response.requestHeader().apiKey() == 
ApiKeys.DELETE_SHARE_GROUP_STATE;
+        }
+
+        @Override
+        protected void handleRequestResponse(ClientResponse response) {
+            log.debug("Delete state response received - {}", response);
+            deleteStateBackoff.incrementAttempt();
+
+            // response can be a combined one for large number of requests
+            // we need to deconstruct it
+            DeleteShareGroupStateResponse combinedResponse = 
(DeleteShareGroupStateResponse) response.responseBody();
+
+            for (DeleteShareGroupStateResponseData.DeleteStateResult 
deleteStateResult : combinedResponse.data().results()) {
+                if 
(deleteStateResult.topicId().equals(partitionKey().topicId())) {
+                    
Optional<DeleteShareGroupStateResponseData.PartitionResult> partitionStateData =
+                        deleteStateResult.partitions().stream()
+                            .filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
+                            .findFirst();
+
+                    if (partitionStateData.isPresent()) {
+                        Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                        switch (error) {
+                            case NONE:
+                                deleteStateBackoff.resetAttempts();
+                                
DeleteShareGroupStateResponseData.DeleteStateResult result = 
DeleteShareGroupStateResponse.toResponseDeleteStateResult(
+                                    partitionKey().topicId(),
+                                    List.of(partitionStateData.get())
+                                );
+                                this.result.complete(new 
DeleteShareGroupStateResponse(
+                                    new 
DeleteShareGroupStateResponseData().setResults(List.of(result))));
+                                return;
+
+                            // check retriable errors
+                            case COORDINATOR_NOT_AVAILABLE:
+                            case COORDINATOR_LOAD_IN_PROGRESS:
+                            case NOT_COORDINATOR:
+                                log.warn("Received retriable error in delete 
state RPC for key {}: {}", partitionKey(), error.message());
+                                if (!deleteStateBackoff.canAttempt()) {
+                                    log.error("Exhausted max retries for 
delete state RPC for key {} without success.", partitionKey());
+                                    deleteStateErrorResponse(error, new 
Exception("Exhausted max retries to complete delete state RPC without 
success."));
+                                    return;
+                                }
+                                super.resetCoordinatorNode();
+                                timer.add(new 
PersisterTimerTask(deleteStateBackoff.backOff(), this));
+                                return;
+
+                            default:
+                                log.error("Unable to perform delete state RPC 
for key {}: {}", partitionKey(), error.message());
+                                deleteStateErrorResponse(error, null);
+                                return;
+                        }
+                    }
+                }
+            }
+
+            // no response found specific topic partition
+            IllegalStateException exception = new IllegalStateException(
+                "Failed to delete state for share partition: " + partitionKey()
+            );
+            deleteStateErrorResponse(Errors.forException(exception), 
exception);
+        }
+
+        private void deleteStateErrorResponse(Errors error, Exception 
exception) {
+            this.result.complete(new DeleteShareGroupStateResponse(
+                
DeleteShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), 
partitionKey().partition(), error, "Error in delete state RPC. " +
+                    (exception == null ? error.message() : 
exception.getMessage()))));
+        }
+
+        @Override
+        protected void findCoordinatorErrorResponse(Errors error, Exception 
exception) {
+            this.result.complete(new DeleteShareGroupStateResponse(
+                
DeleteShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), 
partitionKey().partition(), error, "Error in find coordinator. " +
+                    (exception == null ? error.message() : 
exception.getMessage()))));
+        }
+
+        protected CompletableFuture<DeleteShareGroupStateResponse> result() {
+            return result;
+        }
+
+        @Override
+        protected boolean isBatchable() {
+            return true;
+        }
+
+        @Override
+        protected RPCType rpcType() {
+            return RPCType.DELETE;
+        }
+    }
+
     private class SendThread extends InterBrokerSendThread {
         private final ConcurrentLinkedQueue<PersisterStateManagerHandler> 
queue = new ConcurrentLinkedQueue<>();
         private final Random random;
@@ -1059,6 +1201,8 @@ public class PersisterStateManager {
                     return coalesceReads(groupId, handlers);
                 case SUMMARY:
                     return coalesceReadSummaries(groupId, handlers);
+                case DELETE:
+                    return coalesceDeletes(groupId, handlers);
                 default:
                     throw new RuntimeException("Unknown rpc type: " + rpcType);
             }
@@ -1140,5 +1284,26 @@ public class PersisterStateManager {
                 true
             );
         }
+
+        private static AbstractRequest.Builder<? extends AbstractRequest> 
coalesceDeletes(String groupId, List<? extends PersisterStateManagerHandler> 
handlers) {
+            Map<Uuid, List<DeleteShareGroupStateRequestData.PartitionData>> 
partitionData = new HashMap<>();
+            handlers.forEach(persHandler -> {
+                assert persHandler instanceof DeleteStateHandler;
+                DeleteStateHandler handler = (DeleteStateHandler) persHandler;
+                
partitionData.computeIfAbsent(handler.partitionKey().topicId(), topicId -> new 
LinkedList<>())
+                    .add(
+                        new DeleteShareGroupStateRequestData.PartitionData()
+                            .setPartition(handler.partitionKey().partition())
+                    );
+            });
+
+            return new DeleteShareGroupStateRequest.Builder(new 
DeleteShareGroupStateRequestData()
+                .setGroupId(groupId)
+                .setTopics(partitionData.entrySet().stream()
+                    .map(entry -> new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                        .setTopicId(entry.getKey())
+                        .setPartitions(entry.getValue()))
+                    .toList()));
+        }
     }
 }
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
index 88c83c88914..50ffcf90f81 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
@@ -22,12 +22,15 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
 import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
 import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteShareGroupStateRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
@@ -352,6 +355,81 @@ class DefaultStatePersisterTest {
         assertFutureThrows(result, IllegalArgumentException.class);
     }
 
+    @Test
+    public void testDeleteStateValidate() {
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 0;
+        int incorrectPartition = -1;
+
+        // Request Parameters are null
+        DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
+        CompletableFuture<DeleteShareGroupStateResult> result = 
defaultStatePersister.deleteState(null);
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
+
+        // groupTopicPartitionData is null
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.deleteState(new 
DeleteShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
+
+        // groupId is null
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.deleteState(new 
DeleteShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdData>()
+                .setGroupId(null).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
+
+        // topicsData is empty
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.deleteState(new 
DeleteShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdData>()
+                .setGroupId(groupId)
+                .setTopicsData(List.of()).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
+
+        // topicId is null
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.deleteState(new 
DeleteShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdData>()
+                .setGroupId(groupId)
+                .setTopicsData(List.of(new TopicData<>(null,
+                    List.of(PartitionFactory.newPartitionStateBatchData(
+                        partition, 1, 0, 0, null))))).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
+
+        // partitionData is empty
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.deleteState(new 
DeleteShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdData>()
+                .setGroupId(groupId)
+                .setTopicsData(List.of(new TopicData<>(topicId, 
List.of()))).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
+
+        // partition value is incorrect
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.deleteState(new 
DeleteShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdData>()
+                .setGroupId(groupId)
+                .setTopicsData(List.of(new TopicData<>(topicId,
+                    List.of(PartitionFactory.newPartitionIdData(
+                        incorrectPartition))))).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
+    }
+
     @Test
     public void testWriteStateSuccess() {
 
@@ -789,6 +867,135 @@ class DefaultStatePersisterTest {
         assertEquals(expectedResultMap, resultMap);
     }
 
+    @Test
+    public void testDeleteStateSuccess() {
+        MockClient client = new MockClient(MOCK_TIME);
+
+        String groupId = "group1";
+        Uuid topicId1 = Uuid.randomUuid();
+        int partition1 = 10;
+
+        Uuid topicId2 = Uuid.randomUuid();
+        int partition2 = 8;
+
+        Node suppliedNode = new Node(0, HOST, PORT);
+        Node coordinatorNode1 = new Node(5, HOST, PORT);
+        Node coordinatorNode2 = new Node(6, HOST, PORT);
+
+        String coordinatorKey1 = SharePartitionKey.asCoordinatorKey(groupId, 
topicId1, partition1);
+        String coordinatorKey2 = SharePartitionKey.asCoordinatorKey(groupId, 
topicId2, partition2);
+
+        client.prepareResponseFrom(body -> body instanceof 
FindCoordinatorRequest
+                && ((FindCoordinatorRequest) body).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+                && ((FindCoordinatorRequest) 
body).data().coordinatorKeys().get(0).equals(coordinatorKey1),
+            new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setCoordinators(List.of(
+                        new FindCoordinatorResponseData.Coordinator()
+                            .setNodeId(coordinatorNode1.id())
+                            .setHost(HOST)
+                            .setPort(PORT)
+                            .setErrorCode(Errors.NONE.code())
+                    ))
+            ),
+            suppliedNode
+        );
+
+        client.prepareResponseFrom(body -> body instanceof 
FindCoordinatorRequest
+                && ((FindCoordinatorRequest) body).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+                && ((FindCoordinatorRequest) 
body).data().coordinatorKeys().get(0).equals(coordinatorKey2),
+            new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setCoordinators(List.of(
+                        new FindCoordinatorResponseData.Coordinator()
+                            .setNodeId(coordinatorNode2.id())
+                            .setHost(HOST)
+                            .setPort(PORT)
+                            .setErrorCode(Errors.NONE.code())
+                    ))
+            ),
+            suppliedNode
+        );
+
+        client.prepareResponseFrom(
+            body -> {
+                DeleteShareGroupStateRequest request = 
(DeleteShareGroupStateRequest) body;
+                String requestGroupId = request.data().groupId();
+                Uuid requestTopicId = request.data().topics().get(0).topicId();
+                int requestPartition = 
request.data().topics().get(0).partitions().get(0).partition();
+
+                return requestGroupId.equals(groupId) && requestTopicId == 
topicId1 && requestPartition == partition1;
+            },
+            new 
DeleteShareGroupStateResponse(DeleteShareGroupStateResponse.toResponseData(topicId1,
 partition1)),
+            coordinatorNode1
+        );
+
+        client.prepareResponseFrom(
+            body -> {
+                DeleteShareGroupStateRequest request = 
(DeleteShareGroupStateRequest) body;
+                String requestGroupId = request.data().groupId();
+                Uuid requestTopicId = request.data().topics().get(0).topicId();
+                int requestPartition = 
request.data().topics().get(0).partitions().get(0).partition();
+
+                return requestGroupId.equals(groupId) && requestTopicId == 
topicId2 && requestPartition == partition2;
+            },
+            new 
DeleteShareGroupStateResponse(DeleteShareGroupStateResponse.toResponseData(topicId2,
 partition2)),
+            coordinatorNode2
+        );
+
+        ShareCoordinatorMetadataCacheHelper cacheHelper = 
getDefaultCacheHelper(suppliedNode);
+
+        DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder()
+            .withKafkaClient(client)
+            .withCacheHelper(cacheHelper)
+            .build();
+
+        DeleteShareGroupStateParameters request = 
DeleteShareGroupStateParameters.from(
+            new DeleteShareGroupStateRequestData()
+                .setGroupId(groupId)
+                .setTopics(List.of(
+                    new DeleteShareGroupStateRequestData.DeleteStateData()
+                        .setTopicId(topicId1)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateRequestData.PartitionData()
+                                .setPartition(partition1)
+                        )),
+                    new DeleteShareGroupStateRequestData.DeleteStateData()
+                        .setTopicId(topicId2)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateRequestData.PartitionData()
+                                .setPartition(partition2)
+                        ))
+                ))
+        );
+
+        CompletableFuture<DeleteShareGroupStateResult> resultFuture = 
defaultStatePersister.deleteState(request);
+
+        DeleteShareGroupStateResult result = null;
+        try {
+            // adding long delay to allow for environment/GC issues
+            result = resultFuture.get(10L, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            fail("Unexpected exception", e);
+        }
+
+        HashSet<PartitionData> resultMap = new HashSet<>();
+        result.topicsData().forEach(
+            topicData -> topicData.partitions().forEach(
+                partitionData -> resultMap.add((PartitionData) partitionData)
+            )
+        );
+
+
+        HashSet<PartitionData> expectedResultMap = new HashSet<>();
+        expectedResultMap.add((PartitionData) 
PartitionFactory.newPartitionErrorData(partition1, Errors.NONE.code(), null));
+
+        expectedResultMap.add((PartitionData) 
PartitionFactory.newPartitionErrorData(partition2, Errors.NONE.code(), null));
+
+        assertEquals(2, result.topicsData().size());
+        assertEquals(expectedResultMap, resultMap);
+    }
+
     @Test
     public void testWriteStateResponseToResultPartialResults() {
         Map<Uuid, Map<Integer, 
CompletableFuture<WriteShareGroupStateResponse>>> futureMap = new HashMap<>();
@@ -1111,6 +1318,110 @@ class DefaultStatePersisterTest {
         );
     }
 
+    @Test
+    public void testDeleteStateResponseToResultPartialResults() {
+        Map<Uuid, Map<Integer, 
CompletableFuture<DeleteShareGroupStateResponse>>> futureMap = new HashMap<>();
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1, 
null);
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, 
null);
+
+        // one entry has valid results
+        futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>())
+            .put(tp1.partition(), CompletableFuture.completedFuture(
+                    new DeleteShareGroupStateResponse(
+                        DeleteShareGroupStateResponse.toResponseData(
+                            tp1.topicId(),
+                            tp1.partition()
+                        )
+                    )
+                )
+            );
+
+        // one entry has error
+        futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap<>())
+            .put(tp2.partition(), CompletableFuture.completedFuture(
+                    new DeleteShareGroupStateResponse(
+                        DeleteShareGroupStateResponse.toErrorResponseData(
+                            tp2.topicId(),
+                            tp2.partition(),
+                            Errors.UNKNOWN_TOPIC_OR_PARTITION,
+                            "unknown tp"
+                        )
+                    )
+                )
+            );
+
+        PersisterStateManager psm = mock(PersisterStateManager.class);
+        DefaultStatePersister dsp = new DefaultStatePersister(psm);
+
+        DeleteShareGroupStateResult results = 
dsp.deleteResponsesToResult(futureMap);
+
+        // results should contain partial results
+        assertEquals(2, results.topicsData().size());
+        assertTrue(
+            results.topicsData().contains(
+                new TopicData<>(
+                    tp1.topicId(),
+                    
List.of(PartitionFactory.newPartitionErrorData(tp1.partition(), 
Errors.NONE.code(), null))
+                )
+            )
+        );
+        assertTrue(
+            results.topicsData().contains(
+                new TopicData<>(
+                    tp2.topicId(),
+                    
List.of(PartitionFactory.newPartitionErrorData(tp2.partition(), 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
+                )
+            )
+        );
+    }
+
+    @Test
+    public void testDeleteStateResponseToResultFailedFuture() {
+        Map<Uuid, Map<Integer, 
CompletableFuture<DeleteShareGroupStateResponse>>> futureMap = new HashMap<>();
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1, 
null);
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, 
null);
+
+        // one entry has valid results
+        futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>())
+            .put(tp1.partition(), CompletableFuture.completedFuture(
+                    new DeleteShareGroupStateResponse(
+                        DeleteShareGroupStateResponse.toResponseData(
+                            tp1.topicId(),
+                            tp1.partition()
+                        )
+                    )
+                )
+            );
+
+        // one entry has failed future
+        futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap<>())
+            .put(tp2.partition(), CompletableFuture.failedFuture(new 
Exception("scary stuff")));
+
+        PersisterStateManager psm = mock(PersisterStateManager.class);
+        DefaultStatePersister dsp = new DefaultStatePersister(psm);
+
+        DeleteShareGroupStateResult results = 
dsp.deleteResponsesToResult(futureMap);
+
+        // results should contain partial results
+        assertEquals(2, results.topicsData().size());
+        assertTrue(
+            results.topicsData().contains(
+                new TopicData<>(
+                    tp1.topicId(),
+                    
List.of(PartitionFactory.newPartitionErrorData(tp1.partition(), 
Errors.NONE.code(), null))
+                )
+            )
+        );
+        assertTrue(
+            results.topicsData().contains(
+                new TopicData<>(
+                    tp2.topicId(),
+                    
List.of(PartitionFactory.newPartitionErrorData(tp2.partition(), 
Errors.UNKNOWN_SERVER_ERROR.code(), "Error deleting state from share 
coordinator: java.lang.Exception: scary stuff"))
+                )
+            )
+        );
+    }
+
     @Test
     public void testDefaultPersisterClose() {
         PersisterStateManager psm = mock(PersisterStateManager.class);
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
index afa9b07aa17..61a477b1203 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
@@ -22,12 +22,15 @@ import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
 import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
 import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupStateRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
@@ -2843,6 +2846,728 @@ class PersisterStateManagerTest {
         }
     }
 
+    @Test
+    public void testDeleteStateRequestCoordinatorFoundSuccessfully() {
+        MockClient client = new MockClient(MOCK_TIME);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 10;
+
+        Node suppliedNode = new Node(0, HOST, PORT);
+        Node coordinatorNode = new Node(1, HOST, PORT);
+
+        String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, 
topicId, partition);
+
+        client.prepareResponseFrom(body -> body instanceof 
FindCoordinatorRequest
+                && ((FindCoordinatorRequest) body).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+                && ((FindCoordinatorRequest) 
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+            new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setCoordinators(Collections.singletonList(
+                        new FindCoordinatorResponseData.Coordinator()
+                            .setNodeId(1)
+                            .setHost(HOST)
+                            .setPort(PORT)
+                            .setErrorCode(Errors.NONE.code())
+                    ))
+            ),
+            suppliedNode
+        );
+
+        client.prepareResponseFrom(body -> {
+            DeleteShareGroupStateRequest request = 
(DeleteShareGroupStateRequest) body;
+            String requestGroupId = request.data().groupId();
+            Uuid requestTopicId = request.data().topics().get(0).topicId();
+            int requestPartition = 
request.data().topics().get(0).partitions().get(0).partition();
+
+            return requestGroupId.equals(groupId) && requestTopicId == topicId 
&& requestPartition == partition;
+        }, new DeleteShareGroupStateResponse(
+            new DeleteShareGroupStateResponseData()
+                .setResults(List.of(
+                    new DeleteShareGroupStateResponseData.DeleteStateResult()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateResponseData.PartitionResult()
+                                .setPartition(partition)
+                                .setErrorCode(Errors.NONE.code())
+                                .setErrorMessage("")
+                        ))
+                ))
+        ), coordinatorNode);
+
+        ShareCoordinatorMetadataCacheHelper cacheHelper = 
getDefaultCacheHelper(suppliedNode);
+
+        PersisterStateManager stateManager = 
PersisterStateManagerBuilder.builder()
+            .withKafkaClient(client)
+            .withTimer(mockTimer)
+            .withCacheHelper(cacheHelper)
+            .build();
+
+        stateManager.start();
+
+        CompletableFuture<DeleteShareGroupStateResponse> future = new 
CompletableFuture<>();
+
+        PersisterStateManager.DeleteStateHandler handler = 
spy(stateManager.new DeleteStateHandler(
+            groupId,
+            topicId,
+            partition,
+            future,
+            REQUEST_BACKOFF_MS,
+            REQUEST_BACKOFF_MAX_MS,
+            MAX_RPC_RETRY_ATTEMPTS
+        ));
+
+        stateManager.enqueue(handler);
+
+        CompletableFuture<DeleteShareGroupStateResponse> resultFuture = 
handler.result();
+
+        DeleteShareGroupStateResponse result = null;
+        try {
+            result = resultFuture.get();
+        } catch (Exception e) {
+            fail("Failed to get result from future", e);
+        }
+
+        DeleteShareGroupStateResponseData.PartitionResult partitionResult = 
result.data().results().get(0).partitions().get(0);
+
+        verify(handler, times(1)).findShareCoordinatorBuilder();
+        verify(handler, times(0)).requestBuilder();
+
+        // Verifying the coordinator node was populated correctly by the 
FIND_COORDINATOR request
+        assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+        // Verifying the result returned in correct
+        assertEquals(partition, partitionResult.partition());
+        assertEquals(Errors.NONE.code(), partitionResult.errorCode());
+
+        try {
+            // Stopping the state manager
+            stateManager.stop();
+        } catch (Exception e) {
+            fail("Failed to stop state manager", e);
+        }
+    }
+
+    @Test
+    public void 
testDeleteStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws 
InterruptedException, ExecutionException {
+        MockClient client = new MockClient(MOCK_TIME);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 10;
+
+        Node suppliedNode = new Node(0, HOST, PORT);
+        Node coordinatorNode = new Node(1, HOST, PORT);
+
+        String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, 
topicId, partition);
+
+        client.prepareResponseFrom(body -> body instanceof 
FindCoordinatorRequest
+                && ((FindCoordinatorRequest) body).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+                && ((FindCoordinatorRequest) 
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+            new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setCoordinators(Collections.singletonList(
+                        new FindCoordinatorResponseData.Coordinator()
+                            .setErrorCode(Errors.NOT_COORDINATOR.code())
+                    ))
+            ),
+            suppliedNode
+        );
+
+        client.prepareResponseFrom(body -> body instanceof 
FindCoordinatorRequest
+                && ((FindCoordinatorRequest) body).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+                && ((FindCoordinatorRequest) 
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+            new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setCoordinators(Collections.singletonList(
+                        new FindCoordinatorResponseData.Coordinator()
+                            .setNodeId(1)
+                            .setHost(HOST)
+                            .setPort(PORT)
+                            .setErrorCode(Errors.NONE.code())
+                    ))
+            ),
+            suppliedNode
+        );
+
+        client.prepareResponseFrom(body -> {
+            DeleteShareGroupStateRequest request = 
(DeleteShareGroupStateRequest) body;
+            String requestGroupId = request.data().groupId();
+            Uuid requestTopicId = request.data().topics().get(0).topicId();
+            int requestPartition = 
request.data().topics().get(0).partitions().get(0).partition();
+
+            return requestGroupId.equals(groupId) && requestTopicId == topicId 
&& requestPartition == partition;
+        }, new DeleteShareGroupStateResponse(
+            new DeleteShareGroupStateResponseData()
+                .setResults(List.of(
+                    new DeleteShareGroupStateResponseData.DeleteStateResult()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateResponseData.PartitionResult()
+                                .setPartition(partition)
+                                .setErrorCode(Errors.NONE.code())
+                                .setErrorMessage("")
+                        ))
+                ))
+        ), coordinatorNode);
+
+        ShareCoordinatorMetadataCacheHelper cacheHelper = 
getDefaultCacheHelper(suppliedNode);
+
+        PersisterStateManager stateManager = 
PersisterStateManagerBuilder.builder()
+            .withKafkaClient(client)
+            .withTimer(mockTimer)
+            .withCacheHelper(cacheHelper)
+            .build();
+
+        stateManager.start();
+
+        CompletableFuture<DeleteShareGroupStateResponse> future = new 
CompletableFuture<>();
+
+        PersisterStateManager.DeleteStateHandler handler = 
spy(stateManager.new DeleteStateHandler(
+            groupId,
+            topicId,
+            partition,
+            future,
+            REQUEST_BACKOFF_MS,
+            REQUEST_BACKOFF_MAX_MS,
+            MAX_RPC_RETRY_ATTEMPTS
+        ));
+
+        stateManager.enqueue(handler);
+
+        CompletableFuture<DeleteShareGroupStateResponse> resultFuture = 
handler.result();
+
+        TestUtils.waitForCondition(resultFuture::isDone, 
TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "Failed to get result from future");
+
+        DeleteShareGroupStateResponse result = resultFuture.get();
+        DeleteShareGroupStateResponseData.PartitionResult partitionResult = 
result.data().results().get(0).partitions().get(0);
+
+        verify(handler, times(2)).findShareCoordinatorBuilder();
+        verify(handler, times(0)).requestBuilder();
+
+        // Verifying the coordinator node was populated correctly by the 
FIND_COORDINATOR request
+        assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+        // Verifying the result returned is correct
+        assertEquals(partition, partitionResult.partition());
+        assertEquals(Errors.NONE.code(), partitionResult.errorCode());
+
+        try {
+            // Stopping the state manager
+            stateManager.stop();
+        } catch (Exception e) {
+            fail("Failed to stop state manager", e);
+        }
+    }
+
+    @Test
+    public void testDeleteStateRequestCoordinatorFoundOnRetry() {
+        MockClient client = new MockClient(MOCK_TIME);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 10;
+
+        Node suppliedNode = new Node(0, HOST, PORT);
+        Node coordinatorNode = new Node(1, HOST, PORT);
+
+        String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, 
topicId, partition);
+
+        client.prepareResponseFrom(body -> body instanceof 
FindCoordinatorRequest
+                && ((FindCoordinatorRequest) body).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+                && ((FindCoordinatorRequest) 
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+            new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setCoordinators(Collections.singletonList(
+                        new FindCoordinatorResponseData.Coordinator()
+                            
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                    ))
+            ),
+            suppliedNode
+        );
+
+        client.prepareResponseFrom(body -> body instanceof 
FindCoordinatorRequest
+                && ((FindCoordinatorRequest) body).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+                && ((FindCoordinatorRequest) 
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+            new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setCoordinators(Collections.singletonList(
+                        new FindCoordinatorResponseData.Coordinator()
+                            .setNodeId(1)
+                            .setHost(HOST)
+                            .setPort(PORT)
+                            .setErrorCode(Errors.NONE.code())
+                    ))
+            ),
+            suppliedNode
+        );
+
+        client.prepareResponseFrom(body -> {
+            DeleteShareGroupStateRequest request = 
(DeleteShareGroupStateRequest) body;
+            String requestGroupId = request.data().groupId();
+            Uuid requestTopicId = request.data().topics().get(0).topicId();
+            int requestPartition = 
request.data().topics().get(0).partitions().get(0).partition();
+
+            return requestGroupId.equals(groupId) && requestTopicId == topicId 
&& requestPartition == partition;
+        }, new DeleteShareGroupStateResponse(
+            new DeleteShareGroupStateResponseData()
+                .setResults(List.of(
+                    new DeleteShareGroupStateResponseData.DeleteStateResult()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateResponseData.PartitionResult()
+                                .setPartition(partition)
+                                .setErrorCode(Errors.NONE.code())
+                                .setErrorMessage("")
+                        ))
+                ))
+        ), coordinatorNode);
+
+        ShareCoordinatorMetadataCacheHelper cacheHelper = 
getDefaultCacheHelper(suppliedNode);
+
+        PersisterStateManager stateManager = 
PersisterStateManagerBuilder.builder()
+            .withKafkaClient(client)
+            .withTimer(mockTimer)
+            .withCacheHelper(cacheHelper)
+            .build();
+
+        stateManager.start();
+
+        CompletableFuture<DeleteShareGroupStateResponse> future = new 
CompletableFuture<>();
+
+        PersisterStateManager.DeleteStateHandler handler = 
spy(stateManager.new DeleteStateHandler(
+            groupId,
+            topicId,
+            partition,
+            future,
+            REQUEST_BACKOFF_MS,
+            REQUEST_BACKOFF_MAX_MS,
+            MAX_RPC_RETRY_ATTEMPTS
+        ));
+
+        stateManager.enqueue(handler);
+
+        CompletableFuture<DeleteShareGroupStateResponse> resultFuture = 
handler.result();
+
+        DeleteShareGroupStateResponse result = null;
+        try {
+            result = resultFuture.get();
+        } catch (Exception e) {
+            fail("Failed to get result from future", e);
+        }
+
+        DeleteShareGroupStateResponseData.PartitionResult partitionResult = 
result.data().results().get(0).partitions().get(0);
+
+        verify(handler, times(2)).findShareCoordinatorBuilder();
+        verify(handler, times(0)).requestBuilder();
+
+        // Verifying the coordinator node was populated correctly by the 
FIND_COORDINATOR request
+        assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+        // Verifying the result returned in correct
+        assertEquals(partition, partitionResult.partition());
+        assertEquals(Errors.NONE.code(), partitionResult.errorCode());
+
+        try {
+            // Stopping the state manager
+            stateManager.stop();
+        } catch (Exception e) {
+            fail("Failed to stop state manager", e);
+        }
+    }
+
+    @Test
+    public void testDeleteStateRequestWithCoordinatorNodeLookup() {
+        MockClient client = new MockClient(MOCK_TIME);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 10;
+
+        Node coordinatorNode = new Node(1, HOST, PORT);
+
+        client.prepareResponseFrom(body -> {
+            DeleteShareGroupStateRequest request = 
(DeleteShareGroupStateRequest) body;
+            String requestGroupId = request.data().groupId();
+            Uuid requestTopicId = request.data().topics().get(0).topicId();
+            int requestPartition = 
request.data().topics().get(0).partitions().get(0).partition();
+
+            return requestGroupId.equals(groupId) && requestTopicId == topicId 
&& requestPartition == partition;
+        }, new DeleteShareGroupStateResponse(
+            new DeleteShareGroupStateResponseData()
+                .setResults(List.of(
+                    new DeleteShareGroupStateResponseData.DeleteStateResult()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateResponseData.PartitionResult()
+                                .setPartition(partition)
+                                .setErrorCode(Errors.NONE.code())
+                                .setErrorMessage("")
+                        ))
+                ))
+        ), coordinatorNode);
+
+        ShareCoordinatorMetadataCacheHelper cacheHelper = 
getCoordinatorCacheHelper(coordinatorNode);
+
+        PersisterStateManager stateManager = 
PersisterStateManagerBuilder.builder()
+            .withKafkaClient(client)
+            .withTimer(mockTimer)
+            .withCacheHelper(cacheHelper)
+            .build();
+
+        stateManager.start();
+
+        CompletableFuture<DeleteShareGroupStateResponse> future = new 
CompletableFuture<>();
+
+        PersisterStateManager.DeleteStateHandler handler = 
spy(stateManager.new DeleteStateHandler(
+            groupId,
+            topicId,
+            partition,
+            future,
+            REQUEST_BACKOFF_MS,
+            REQUEST_BACKOFF_MAX_MS,
+            MAX_RPC_RETRY_ATTEMPTS
+        ));
+
+        stateManager.enqueue(handler);
+
+        CompletableFuture<DeleteShareGroupStateResponse> resultFuture = 
handler.result();
+
+        DeleteShareGroupStateResponse result = null;
+        try {
+            result = resultFuture.get();
+        } catch (Exception e) {
+            fail("Failed to get result from future", e);
+        }
+
+        DeleteShareGroupStateResponseData.PartitionResult partitionResult = 
result.data().results().get(0).partitions().get(0);
+
+        verify(handler, times(0)).findShareCoordinatorBuilder();
+        verify(handler, times(0)).requestBuilder();
+        verify(handler, times(1)).onComplete(any());
+
+        // Verifying the coordinator node was populated correctly by the 
FIND_COORDINATOR request
+        assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+        // Verifying the result returned in correct
+        assertEquals(partition, partitionResult.partition());
+        assertEquals(Errors.NONE.code(), partitionResult.errorCode());
+
+        try {
+            // Stopping the state manager
+            stateManager.stop();
+        } catch (Exception e) {
+            fail("Failed to stop state manager", e);
+        }
+    }
+
+    @Test
+    public void testDeleteStateRequestWithRetryAndCoordinatorNodeLookup() {
+        MockClient client = new MockClient(MOCK_TIME);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 10;
+
+        Node coordinatorNode = new Node(1, HOST, PORT);
+
+        client.prepareResponseFrom(body -> {
+            DeleteShareGroupStateRequest request = 
(DeleteShareGroupStateRequest) body;
+            String requestGroupId = request.data().groupId();
+            Uuid requestTopicId = request.data().topics().get(0).topicId();
+            int requestPartition = 
request.data().topics().get(0).partitions().get(0).partition();
+
+            return requestGroupId.equals(groupId) && requestTopicId == topicId 
&& requestPartition == partition;
+        }, new DeleteShareGroupStateResponse(
+            new DeleteShareGroupStateResponseData()
+                .setResults(List.of(
+                    new DeleteShareGroupStateResponseData.DeleteStateResult()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateResponseData.PartitionResult()
+                                .setPartition(partition)
+                                
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                                .setErrorMessage("")
+                        ))
+                ))
+        ), coordinatorNode);
+
+        client.prepareResponseFrom(body -> {
+            DeleteShareGroupStateRequest request = 
(DeleteShareGroupStateRequest) body;
+            String requestGroupId = request.data().groupId();
+            Uuid requestTopicId = request.data().topics().get(0).topicId();
+            int requestPartition = 
request.data().topics().get(0).partitions().get(0).partition();
+
+            return requestGroupId.equals(groupId) && requestTopicId == topicId 
&& requestPartition == partition;
+        }, new DeleteShareGroupStateResponse(
+            new DeleteShareGroupStateResponseData()
+                .setResults(List.of(
+                    new DeleteShareGroupStateResponseData.DeleteStateResult()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateResponseData.PartitionResult()
+                                .setPartition(partition)
+                                .setErrorCode(Errors.NONE.code())
+                                .setErrorMessage("")
+                        ))
+                ))
+        ), coordinatorNode);
+
+        ShareCoordinatorMetadataCacheHelper cacheHelper = 
getCoordinatorCacheHelper(coordinatorNode);
+
+        PersisterStateManager stateManager = 
PersisterStateManagerBuilder.builder()
+            .withKafkaClient(client)
+            .withTimer(mockTimer)
+            .withCacheHelper(cacheHelper)
+            .build();
+
+        stateManager.start();
+
+        CompletableFuture<DeleteShareGroupStateResponse> future = new 
CompletableFuture<>();
+
+        PersisterStateManager.DeleteStateHandler handler = 
spy(stateManager.new DeleteStateHandler(
+            groupId,
+            topicId,
+            partition,
+            future,
+            REQUEST_BACKOFF_MS,
+            REQUEST_BACKOFF_MAX_MS,
+            MAX_RPC_RETRY_ATTEMPTS
+        ));
+
+        stateManager.enqueue(handler);
+
+        CompletableFuture<DeleteShareGroupStateResponse> resultFuture = 
handler.result();
+
+        DeleteShareGroupStateResponse result = null;
+        try {
+            result = resultFuture.get();
+        } catch (Exception e) {
+            fail("Failed to get result from future", e);
+        }
+
+        DeleteShareGroupStateResponseData.PartitionResult partitionResult = 
result.data().results().get(0).partitions().get(0);
+
+        verify(handler, times(0)).findShareCoordinatorBuilder();
+        verify(handler, times(0)).requestBuilder();
+        verify(handler, times(2)).onComplete(any());
+
+        // Verifying the coordinator node was populated correctly by the 
FIND_COORDINATOR request
+        assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+        // Verifying the result returned in correct
+        assertEquals(partition, partitionResult.partition());
+        assertEquals(Errors.NONE.code(), partitionResult.errorCode());
+
+        try {
+            // Stopping the state manager
+            stateManager.stop();
+        } catch (Exception e) {
+            fail("Failed to stop state manager", e);
+        }
+    }
+
+    @Test
+    public void testDeleteStateRequestFailedMaxRetriesExhausted() {
+        MockClient client = new MockClient(MOCK_TIME);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 10;
+
+        Node coordinatorNode = new Node(1, HOST, PORT);
+
+        client.prepareResponseFrom(body -> {
+            DeleteShareGroupStateRequest request = 
(DeleteShareGroupStateRequest) body;
+            String requestGroupId = request.data().groupId();
+            Uuid requestTopicId = request.data().topics().get(0).topicId();
+            int requestPartition = 
request.data().topics().get(0).partitions().get(0).partition();
+
+            return requestGroupId.equals(groupId) && requestTopicId == topicId 
&& requestPartition == partition;
+        }, new DeleteShareGroupStateResponse(
+            new DeleteShareGroupStateResponseData()
+                .setResults(List.of(
+                    new DeleteShareGroupStateResponseData.DeleteStateResult()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateResponseData.PartitionResult()
+                                .setPartition(partition)
+                                
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                                .setErrorMessage("")
+                        ))
+                ))
+        ), coordinatorNode);
+
+        client.prepareResponseFrom(body -> {
+            DeleteShareGroupStateRequest request = 
(DeleteShareGroupStateRequest) body;
+            String requestGroupId = request.data().groupId();
+            Uuid requestTopicId = request.data().topics().get(0).topicId();
+            int requestPartition = 
request.data().topics().get(0).partitions().get(0).partition();
+
+            return requestGroupId.equals(groupId) && requestTopicId == topicId 
&& requestPartition == partition;
+        }, new DeleteShareGroupStateResponse(
+            new DeleteShareGroupStateResponseData()
+                .setResults(List.of(
+                    new DeleteShareGroupStateResponseData.DeleteStateResult()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateResponseData.PartitionResult()
+                                .setPartition(partition)
+                                
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                                .setErrorMessage("")
+                        ))
+                ))
+        ), coordinatorNode);
+
+        client.prepareResponseFrom(body -> {
+            DeleteShareGroupStateRequest request = 
(DeleteShareGroupStateRequest) body;
+            String requestGroupId = request.data().groupId();
+            Uuid requestTopicId = request.data().topics().get(0).topicId();
+            int requestPartition = 
request.data().topics().get(0).partitions().get(0).partition();
+
+            return requestGroupId.equals(groupId) && requestTopicId == topicId 
&& requestPartition == partition;
+        }, new DeleteShareGroupStateResponse(
+            new DeleteShareGroupStateResponseData()
+                .setResults(List.of(
+                    new DeleteShareGroupStateResponseData.DeleteStateResult()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateResponseData.PartitionResult()
+                                .setPartition(partition)
+                                .setErrorCode(Errors.NONE.code())
+                                .setErrorMessage("")
+                        ))
+                ))
+        ), coordinatorNode);
+
+        ShareCoordinatorMetadataCacheHelper cacheHelper = 
getCoordinatorCacheHelper(coordinatorNode);
+
+        PersisterStateManager stateManager = 
PersisterStateManagerBuilder.builder()
+            .withKafkaClient(client)
+            .withTimer(mockTimer)
+            .withCacheHelper(cacheHelper)
+            .build();
+
+        stateManager.start();
+
+        CompletableFuture<DeleteShareGroupStateResponse> future = new 
CompletableFuture<>();
+
+        PersisterStateManager.DeleteStateHandler handler = 
spy(stateManager.new DeleteStateHandler(
+            groupId,
+            topicId,
+            partition,
+            future,
+            REQUEST_BACKOFF_MS,
+            REQUEST_BACKOFF_MAX_MS,
+            2
+        ));
+
+        stateManager.enqueue(handler);
+
+        CompletableFuture<DeleteShareGroupStateResponse> resultFuture = 
handler.result();
+
+        DeleteShareGroupStateResponse result = null;
+        try {
+            result = resultFuture.get();
+        } catch (Exception e) {
+            fail("Failed to get result from future", e);
+        }
+
+        DeleteShareGroupStateResponseData.PartitionResult partitionResult = 
result.data().results().get(0).partitions().get(0);
+
+        verify(handler, times(0)).findShareCoordinatorBuilder();
+        verify(handler, times(0)).requestBuilder();
+        verify(handler, times(2)).onComplete(any());
+
+        // Verifying the coordinator node was populated correctly by the 
FIND_COORDINATOR request
+        assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+        // Verifying the result returned in correct
+        assertEquals(partition, partitionResult.partition());
+        assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), 
partitionResult.errorCode());
+
+        try {
+            // Stopping the state manager
+            stateManager.stop();
+        } catch (Exception e) {
+            fail("Failed to stop state manager", e);
+        }
+    }
+
+    @Test
+    public void testDeleteStateRequestBatchingWithCoordinatorNodeLookup() 
throws Exception {
+        MockClient client = new MockClient(MOCK_TIME);
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 10;
+
+        Node coordinatorNode = new Node(1, HOST, PORT);
+
+        client.prepareResponseFrom(body -> {
+            DeleteShareGroupStateRequest request = 
(DeleteShareGroupStateRequest) body;
+            String requestGroupId = request.data().groupId();
+            Uuid requestTopicId = request.data().topics().get(0).topicId();
+            int requestPartition = 
request.data().topics().get(0).partitions().get(0).partition();
+
+            return requestGroupId.equals(groupId) && requestTopicId == topicId 
&& requestPartition == partition;
+        }, new DeleteShareGroupStateResponse(
+            new DeleteShareGroupStateResponseData()
+                .setResults(List.of(
+                    new DeleteShareGroupStateResponseData.DeleteStateResult()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupStateResponseData.PartitionResult()
+                                .setPartition(partition)
+                                .setErrorCode(Errors.NONE.code())
+                                .setErrorMessage("")
+                        ))
+                ))
+        ), coordinatorNode);
+
+        ShareCoordinatorMetadataCacheHelper cacheHelper = 
getCoordinatorCacheHelper(coordinatorNode);
+
+        PersisterStateManager stateManager = 
PersisterStateManagerBuilder.builder()
+            .withKafkaClient(client)
+            .withTimer(mockTimer)
+            .withCacheHelper(cacheHelper)
+            .build();
+
+        AtomicBoolean isBatchingSuccess = new AtomicBoolean(false);
+        stateManager.setGenerateCallback(() -> {
+            Map<PersisterStateManager.RPCType, Map<String, 
List<PersisterStateManager.PersisterStateManagerHandler>>> handlersPerType = 
stateManager.nodeRPCMap().get(coordinatorNode);
+            if (handlersPerType != null && 
handlersPerType.containsKey(PersisterStateManager.RPCType.DELETE) && 
handlersPerType.get(PersisterStateManager.RPCType.DELETE).containsKey(groupId)) 
{
+                if 
(handlersPerType.get(PersisterStateManager.RPCType.DELETE).get(groupId).size() 
> 2)
+                    isBatchingSuccess.set(true);
+            }
+        });
+
+        stateManager.start();
+
+        CompletableFuture<DeleteShareGroupStateResponse> future = new 
CompletableFuture<>();
+
+        List<PersisterStateManager.DeleteStateHandler> handlers = new 
ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            PersisterStateManager.DeleteStateHandler handler = 
spy(stateManager.new DeleteStateHandler(
+                groupId,
+                topicId,
+                partition,
+                future,
+                REQUEST_BACKOFF_MS,
+                REQUEST_BACKOFF_MAX_MS,
+                MAX_RPC_RETRY_ATTEMPTS
+            ));
+            handlers.add(handler);
+            stateManager.enqueue(handler);
+        }
+
+        CompletableFuture.allOf(handlers.stream()
+            
.map(PersisterStateManager.DeleteStateHandler::result).toArray(CompletableFuture[]::new)).get();
+
+        TestUtils.waitForCondition(isBatchingSuccess::get, 
TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "unable to verify batching");
+    }
+
     @Test
     public void testPersisterStateManagerClose() {
         KafkaClient client = mock(KafkaClient.class);

Reply via email to