This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a12f9f97c98 KAFKA-14506: Implement DeleteGroups API and OffsetDelete 
API (#14408)
a12f9f97c98 is described below

commit a12f9f97c98f31d8d957ff9b816c4f73b84e0d9d
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Wed Oct 4 05:30:45 2023 -0400

    KAFKA-14506: Implement DeleteGroups API and OffsetDelete API (#14408)
    
    This patch implements DeleteGroups and OffsetDelete API in the new group 
coordinator.
    
    Reviewers: yangy0000, Ritika Reddy <[email protected]>, Jeff Kim 
<[email protected]>, David Jacot <[email protected]>
---
 checkstyle/suppressions.xml                        |   4 +-
 .../kafka/common/requests/DeleteGroupsRequest.java |  32 +--
 .../common/requests/DeleteGroupsRequestTest.java   |  49 ++++
 .../org/apache/kafka/coordinator/group/Group.java  |  27 +++
 .../coordinator/group/GroupCoordinatorService.java | 125 +++++++----
 .../coordinator/group/GroupCoordinatorShard.java   |  78 ++++++-
 .../coordinator/group/GroupMetadataManager.java    |  27 +++
 .../coordinator/group/OffsetMetadataManager.java   | 104 +++++++++
 .../coordinator/group/consumer/ConsumerGroup.java  |  43 ++++
 .../coordinator/group/generic/GenericGroup.java    |  53 ++++-
 .../group/GroupCoordinatorServiceTest.java         | 246 ++++++++++++++++++++-
 .../group/GroupCoordinatorShardTest.java           | 147 ++++++++++++
 .../group/GroupMetadataManagerTest.java            |  30 ++-
 .../group/OffsetMetadataManagerTest.java           | 206 ++++++++++++++++-
 .../group/consumer/ConsumerGroupTest.java          |  30 +++
 .../group/generic/GenericGroupTest.java            |  61 ++++-
 16 files changed, 1201 insertions(+), 61 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 29ef2ff0ad8..c6dfae8cc44 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -326,11 +326,11 @@
     <suppress checks="(NPathComplexity|MethodLength)"
               
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
     <suppress checks="ClassFanOutComplexity"
-              
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
+              
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorService|GroupCoordinatorServiceTest).java"/>
     <suppress checks="ParameterNumber"
               files="(ConsumerGroupMember|GroupMetadataManager).java"/>
     <suppress checks="ClassDataAbstractionCouplingCheck"
-              
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
+              
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/>
     <suppress checks="JavaNCSS"
               files="GroupMetadataManagerTest.java"/>
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
index 87d6deedc12..6bee4eb8937 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
@@ -18,13 +18,12 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.message.DeleteGroupsRequestData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
-import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
-import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
+import java.util.List;
 
 public class DeleteGroupsRequest extends AbstractRequest {
     public static class Builder extends 
AbstractRequest.Builder<DeleteGroupsRequest> {
@@ -55,18 +54,9 @@ public class DeleteGroupsRequest extends AbstractRequest {
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        Errors error = Errors.forException(e);
-        DeletableGroupResultCollection groupResults = new 
DeletableGroupResultCollection();
-        for (String groupId : data.groupsNames()) {
-            groupResults.add(new DeletableGroupResult()
-                                 .setGroupId(groupId)
-                                 .setErrorCode(error.code()));
-        }
-
-        return new DeleteGroupsResponse(
-            new DeleteGroupsResponseData()
-                .setResults(groupResults)
-                .setThrottleTimeMs(throttleTimeMs)
+        return new DeleteGroupsResponse(new DeleteGroupsResponseData()
+            .setResults(getErrorResultCollection(data.groupsNames(), 
Errors.forException(e)))
+            .setThrottleTimeMs(throttleTimeMs)
         );
     }
 
@@ -78,4 +68,18 @@ public class DeleteGroupsRequest extends AbstractRequest {
     public DeleteGroupsRequestData data() {
         return data;
     }
+
+    public static DeleteGroupsResponseData.DeletableGroupResultCollection 
getErrorResultCollection(
+        List<String> groupIds,
+        Errors error
+    ) {
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        groupIds.forEach(groupId -> resultCollection.add(
+            new DeleteGroupsResponseData.DeletableGroupResult()
+                .setGroupId(groupId)
+                .setErrorCode(error.code())
+        ));
+        return resultCollection;
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java
new file mode 100644
index 00000000000..9745af8d0b7
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static 
org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DeleteGroupsRequestTest {
+
+    @Test
+    public void testGetErrorResultCollection() {
+        String groupId1 = "group-id-1";
+        String groupId2 = "group-id-2";
+        DeleteGroupsRequestData data = new DeleteGroupsRequestData()
+            .setGroupsNames(Arrays.asList(groupId1, groupId2));
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new 
DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId(groupId1)
+                    .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()),
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId(groupId2)
+                    .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+            ).iterator());
+
+        assertEquals(expectedResultCollection, 
getErrorResultCollection(data.groupsNames(), 
Errors.COORDINATOR_LOAD_IN_PROGRESS));
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index cbd25cb0700..29a252e47bf 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -19,6 +19,8 @@ package org.apache.kafka.coordinator.group;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 
+import java.util.List;
+
 /**
  * Interface common for all groups.
  */
@@ -90,4 +92,29 @@ public interface Group {
         int memberEpoch,
         long lastCommittedOffset
     ) throws KafkaException;
+
+    /**
+     * Validates the OffsetDelete request.
+     */
+    void validateOffsetDelete() throws KafkaException;
+
+    /**
+     * Validates the DeleteGroups request.
+     */
+    void validateDeleteGroup() throws KafkaException;
+
+    /**
+     * Returns true if the group is actively subscribed to the topic.
+     *
+     * @param topic The topic name.
+     * @return Whether the group is subscribed to the topic.
+     */
+    boolean isSubscribedToTopic(String topic);
+    
+    /**
+     * Populates the list of records with tombstone(s) for deleting the group.
+     *
+     * @param records The list of records.
+     */
+    void createGroupTombstoneRecords(List<Record> records);
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 9e11cfa63e7..1c71f6dee37 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteGroupsRequest;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.requests.TransactionResult;
@@ -77,6 +78,8 @@ import org.slf4j.Logger;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.OptionalInt;
 import java.util.Properties;
 import java.util.Set;
@@ -523,9 +526,50 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final 
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
 futures =
+            new ArrayList<>(groupIds.size());
+
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            // For backwards compatibility, we support DeleteGroups for the 
empty group id.
+            if (groupId == null) {
+                
futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(
+                    Collections.singletonList(null),
+                    Errors.INVALID_GROUP_ID
+                )));
+            } else {
+                final TopicPartition topicPartition = 
topicPartitionFor(groupId);
+                groupsByTopicPartition
+                    .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+                    .add(groupId);
+            }
+        });
+
+        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+                runtime.scheduleWriteOperation(
+                    "delete-groups",
+                    topicPartition,
+                    coordinator -> coordinator.deleteGroups(context, groupList)
+                ).exceptionally(exception ->
+                    DeleteGroupsRequest.getErrorResultCollection(groupList, 
normalizeException(exception))
+                );
+
+            futures.add(future);
+        });
+
+        final CompletableFuture<Void> allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+        return allFutures.thenApply(__ -> {
+            final DeleteGroupsResponseData.DeletableGroupResultCollection res 
= new DeleteGroupsResponseData.DeletableGroupResultCollection();
+            futures.forEach(future ->
+                // We don't use res.addAll(future.join()) because 
DeletableGroupResultCollection is an ImplicitLinkedHashMultiCollection,
+                // which has requirements for adding elements (see 
ImplicitLinkedHashCollection.java#add).
+                future.join().forEach(result ->
+                    res.add(result.duplicate())
+                )
+            );
+            return res;
+        });
     }
 
     /**
@@ -641,37 +685,9 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             "commit-offset",
             topicPartitionFor(request.groupId()),
             coordinator -> coordinator.commitOffset(context, request)
-        ).exceptionally(exception -> {
-            if (exception instanceof UnknownTopicOrPartitionException ||
-                exception instanceof NotEnoughReplicasException) {
-                return OffsetCommitRequest.getErrorResponse(
-                    request,
-                    Errors.COORDINATOR_NOT_AVAILABLE
-                );
-            }
-
-            if (exception instanceof NotLeaderOrFollowerException ||
-                exception instanceof KafkaStorageException) {
-                return OffsetCommitRequest.getErrorResponse(
-                    request,
-                    Errors.NOT_COORDINATOR
-                );
-            }
-
-            if (exception instanceof RecordTooLargeException ||
-                exception instanceof RecordBatchTooLargeException ||
-                exception instanceof InvalidFetchSizeException) {
-                return OffsetCommitRequest.getErrorResponse(
-                    request,
-                    Errors.INVALID_COMMIT_OFFSET_SIZE
-                );
-            }
-
-            return OffsetCommitRequest.getErrorResponse(
-                request,
-                Errors.forException(exception)
-            );
-        });
+        ).exceptionally(exception ->
+            OffsetCommitRequest.getErrorResponse(request, 
normalizeException(exception))
+        );
     }
 
     /**
@@ -705,9 +721,20 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        if (!isGroupIdNotEmpty(request.groupId())) {
+            return CompletableFuture.completedFuture(new 
OffsetDeleteResponseData()
+                .setErrorCode(Errors.INVALID_GROUP_ID.code())
+            );
+        }
+
+        return runtime.scheduleWriteOperation(
+            "delete-offsets",
+            topicPartitionFor(request.groupId()),
+            coordinator -> coordinator.deleteOffsets(context, request)
+        ).exceptionally(exception ->
+            new OffsetDeleteResponseData()
+                .setErrorCode(normalizeException(exception).code())
+        );
     }
 
     /**
@@ -827,4 +854,28 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
     private static boolean isGroupIdNotEmpty(String groupId) {
         return groupId != null && !groupId.isEmpty();
     }
+
+    /**
+     * Handles the exception in the scheduleWriteOperation.
+     * @return The Errors instance associated with the given exception.
+     */
+    private static Errors normalizeException(Throwable exception) {
+        if (exception instanceof UnknownTopicOrPartitionException ||
+            exception instanceof NotEnoughReplicasException) {
+            return Errors.COORDINATOR_NOT_AVAILABLE;
+        }
+
+        if (exception instanceof NotLeaderOrFollowerException ||
+            exception instanceof KafkaStorageException) {
+            return Errors.NOT_COORDINATOR;
+        }
+
+        if (exception instanceof RecordTooLargeException ||
+            exception instanceof RecordBatchTooLargeException ||
+            exception instanceof InvalidFetchSizeException) {
+            return Errors.UNKNOWN_SERVER_ERROR;
+        }
+
+        return Errors.forException(exception);
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 97327c9722f..a094d7c6e41 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -27,12 +28,15 @@ import 
org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
@@ -60,7 +64,9 @@ import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
@@ -156,12 +162,18 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
                 .build();
 
             return new GroupCoordinatorShard(
+                logContext,
                 groupMetadataManager,
                 offsetMetadataManager
             );
         }
     }
 
+    /**
+     * The logger.
+     */
+    private final Logger log;
+
     /**
      * The group metadata manager.
      */
@@ -175,13 +187,16 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
     /**
      * Constructor.
      *
+     * @param logContext            The log context.
      * @param groupMetadataManager  The group metadata manager.
      * @param offsetMetadataManager The offset metadata manager.
      */
     GroupCoordinatorShard(
+        LogContext logContext,
         GroupMetadataManager groupMetadataManager,
         OffsetMetadataManager offsetMetadataManager
     ) {
+        this.log = logContext.logger(GroupCoordinatorShard.class);
         this.groupMetadataManager = groupMetadataManager;
         this.offsetMetadataManager = offsetMetadataManager;
     }
@@ -262,6 +277,51 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
         );
     }
 
+    /**
+     * Handles a DeleteGroups request.
+     *
+     * @param context   The request context.
+     * @param groupIds  The groupIds of the groups to be deleted
+     * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+     *         a list of records to update the state machine.
+     */
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> deleteGroups(
+        RequestContext context,
+        List<String> groupIds
+    ) throws ApiException {
+        final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+            new 
DeleteGroupsResponseData.DeletableGroupResultCollection(groupIds.size());
+        final List<Record> records = new ArrayList<>();
+        int numDeletedOffsets = 0;
+        final List<String> deletedGroups = new ArrayList<>();
+
+        for (String groupId : groupIds) {
+            try {
+                groupMetadataManager.validateDeleteGroup(groupId);
+                numDeletedOffsets += 
offsetMetadataManager.deleteAllOffsets(groupId, records);
+                groupMetadataManager.deleteGroup(groupId, records);
+                deletedGroups.add(groupId);
+
+                resultCollection.add(
+                    new DeleteGroupsResponseData.DeletableGroupResult()
+                        .setGroupId(groupId)
+                );
+            } catch (ApiException exception) {
+                resultCollection.add(
+                    new DeleteGroupsResponseData.DeletableGroupResult()
+                        .setGroupId(groupId)
+                        .setErrorCode(Errors.forException(exception).code())
+                );
+            }
+        }
+
+        log.info("The following groups were deleted: {}. A total of {} offsets 
were removed.",
+            String.join(", ", deletedGroups),
+            numDeletedOffsets
+        );
+        return new CoordinatorResult<>(records, resultCollection);
+    }
+
     /**
      * Fetch offsets for a given set of partitions and a given group.
      *
@@ -295,7 +355,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
     }
 
     /**
-     * Handles a OffsetCommit request.
+     * Handles an OffsetCommit request.
      *
      * @param context The request context.
      * @param request The actual OffsetCommit request.
@@ -341,6 +401,22 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
         return groupMetadataManager.genericGroupLeave(context, request);
     }
 
+    /**
+     * Handles a OffsetDelete request.
+     *
+     * @param context The request context.
+     * @param request The actual OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponse response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        RequestContext context,
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        return offsetMetadataManager.deleteOffsets(request);
+    }
+
     /**
      * The coordinator has been loaded. This is used to apply any
      * post loading operations (e.g. registering timers).
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 66742312a64..7588c598dcc 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3060,6 +3060,33 @@ public class GroupMetadataManager {
         group.remove(member.memberId());
     }
 
+    /**
+     * Handles a DeleteGroups request.
+     * Populates the record list passed in with record to update the state 
machine.
+     * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
+     * calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+     *
+     * @param groupId The id of the group to be deleted. It has been checked 
in {@link GroupMetadataManager#validateDeleteGroup}.
+     * @param records The record list to populate.
+     */
+    public void deleteGroup(
+        String groupId,
+        List<Record> records
+    ) {
+        // At this point, we have already validated the group id, so we know 
that the group exists and that no exception will be thrown.
+        group(groupId).createGroupTombstoneRecords(records);
+    }
+
+    /**
+     * Validates the DeleteGroups request.
+     *
+     * @param groupId The id of the group to be deleted.
+     */
+    void validateDeleteGroup(String groupId) throws ApiException {
+        Group group = group(groupId);
+        group.validateDeleteGroup();
+    }
+
     /**
      * Checks whether the given protocol type or name in the request is 
inconsistent with the group's.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index c8bf388d714..9744e492f49 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -23,8 +23,10 @@ import 
org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.RequestContext;
@@ -45,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.OptionalLong;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
 
@@ -242,6 +245,19 @@ public class OffsetMetadataManager {
         );
     }
 
+    /**
+     * Validates an OffsetDelete request.
+     *
+     * @param request The actual request.
+     */
+    private Group validateOffsetDelete(
+        OffsetDeleteRequestData request
+    ) throws GroupIdNotFoundException {
+        Group group = groupMetadataManager.group(request.groupId());
+        group.validateOffsetDelete();
+        return group;
+    }
+
     /**
      * Computes the expiration timestamp based on the retention time provided 
in the OffsetCommit
      * request.
@@ -333,6 +349,94 @@ public class OffsetMetadataManager {
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+
+            if (group.isSubscribedToTopic(topic.name())) {
+                topic.partitions().forEach(partition ->
+                    responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
+                    )
+                );
+            } else {
+                final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic == null ?
+                    null : offsetsByTopic.get(topic.name());
+                if (offsetsByPartition != null) {
+                    topic.partitions().forEach(partition -> {
+                        if 
(offsetsByPartition.containsKey(partition.partitionIndex())) {
+                            responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                                .setPartitionIndex(partition.partitionIndex())
+                            );
+                            
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+                                request.groupId(),
+                                topic.name(),
+                                partition.partitionIndex()
+                            ));
+                        }
+                    });
+                }
+            }
+
+            responseTopicCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+                .setName(topic.name())
+                .setPartitions(responsePartitionCollection)
+            );
+        });
+
+        return new CoordinatorResult<>(
+            records,
+            new OffsetDeleteResponseData().setTopics(responseTopicCollection)
+        );
+    }
+
+    /**
+     * Deletes offsets as part of a DeleteGroups request.
+     * Populates the record list passed in with records to update the state 
machine.
+     * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)}
+     *
+     * @param groupId The id of the given group.
+     * @param records The record list to populate.
+     *
+     * @return The number of offsets to be deleted.
+     */
+    public int deleteAllOffsets(
+        String groupId,
+        List<Record> records
+    ) {
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic = offsetsByGroup.get(groupId);
+        AtomicInteger numDeletedOffsets = new AtomicInteger();
+
+        if (offsetsByTopic != null) {
+            offsetsByTopic.forEach((topic, offsetsByPartition) ->
+                offsetsByPartition.keySet().forEach(partition -> {
+                    
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, 
partition));
+                    numDeletedOffsets.getAndIncrement();
+                })
+            );
+        }
+        return numDeletedOffsets.get();
+    }
+
     /**
      * Fetch offsets for a given Group.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 10222c3a3cc..21f6f8124cd 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -18,10 +18,14 @@ package org.apache.kafka.coordinator.group.consumer;
 
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.StaleMemberEpochException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.RecordHelpers;
 import org.apache.kafka.image.ClusterImage;
 import org.apache.kafka.image.TopicImage;
 import org.apache.kafka.image.TopicsImage;
@@ -33,6 +37,7 @@ import org.apache.kafka.timeline.TimelineObject;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -341,6 +346,16 @@ public class ConsumerGroup implements Group {
         return Collections.unmodifiableSet(subscribedTopicNames.keySet());
     }
 
+    /**
+     * Returns true if the consumer group is actively subscribed to the topic.
+     *
+     * @param topic The topic name.
+     * @return whether the group is subscribed to the topic.
+     */
+    public boolean isSubscribedToTopic(String topic) {
+        return subscribedTopicNames.containsKey(topic);
+    }
+
     /**
      * Returns the target assignment of the member.
      *
@@ -592,6 +607,34 @@ public class ConsumerGroup implements Group {
         validateMemberEpoch(memberEpoch, member.memberEpoch());
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() {}
+
+    /**
+     * Validates the DeleteGroups request.
+     */
+    @Override
+    public void validateDeleteGroup() throws ApiException {
+        if (state() != ConsumerGroupState.EMPTY) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }
+    }
+
+    /**
+     * Populates the list of records with tombstone(s) for deleting the group.
+     *
+     * @param records The list of records.
+     */
+    @Override
+    public void createGroupTombstoneRecords(List<Record> records) {
+        
records.add(RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId()));
+        
records.add(RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId()));
+        records.add(RecordHelpers.newGroupEpochTombstoneRecord(groupId()));
+    }
+
     /**
      * Throws a StaleMemberEpochException if the received member epoch does 
not match
      * the expected member epoch.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
index 18f96f2f78e..88ec52727fa 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group.generic;
 
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
@@ -32,6 +33,8 @@ import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.RecordHelpers;
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
@@ -54,6 +57,7 @@ import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPL
 import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
 import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
 import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
 
 /**
  * This class holds metadata for a generic group where the
@@ -849,6 +853,51 @@ public class GenericGroup implements Group {
         }
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() throws ApiException {
+        switch (currentState()) {
+            case DEAD:
+                throw new GroupIdNotFoundException(String.format("Group %s is 
in dead state.", groupId));
+            case STABLE:
+            case PREPARING_REBALANCE:
+            case COMPLETING_REBALANCE:
+                if (!usesConsumerGroupProtocol()) {
+                    throw Errors.NON_EMPTY_GROUP.exception();
+                }
+                break;
+            default:
+        }
+    }
+
+    /**
+     * Validates the DeleteGroups request.
+     */
+    @Override
+    public void validateDeleteGroup() throws ApiException {
+        switch (currentState()) {
+            case DEAD:
+                throw new GroupIdNotFoundException(String.format("Group %s is 
in dead state.", groupId));
+            case STABLE:
+            case PREPARING_REBALANCE:
+            case COMPLETING_REBALANCE:
+                throw Errors.NON_EMPTY_GROUP.exception();
+            default:
+        }
+    }
+
+    /**
+     * Populates the list of records with tombstone(s) for deleting the group.
+     *
+     * @param records The list of records.
+     */
+    @Override
+    public void createGroupTombstoneRecords(List<Record> records) {
+        records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
+    }
+
     /**
      * Verify the member id is up to date for static members. Return true if 
both conditions met:
      *   1. given member is a known static member to group
@@ -1015,10 +1064,10 @@ public class GenericGroup implements Group {
 
     /**
      * Returns true if the consumer group is actively subscribed to the topic. 
When the consumer
-     * group does not know, because the information is not available yet or 
because the it has
+     * group does not know, because the information is not available yet or 
because it has
      * failed to parse the Consumer Protocol, it returns true to be safe.
      *
-     * @param topic the topic name.
+     * @param topic The topic name.
      * @return whether the group is subscribed to the topic.
      */
     public boolean isSubscribedToTopic(String topic) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 8f39f524484..755bd2aa56a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -34,12 +34,15 @@ import 
org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -499,7 +502,6 @@ public class GroupCoordinatorServiceTest {
             .setGroupId(null)
             .setMemberId(UNKNOWN_MEMBER_ID);
 
-
         CompletableFuture<SyncGroupResponseData> response = service.syncGroup(
             requestContext(ApiKeys.SYNC_GROUP),
             request,
@@ -936,4 +938,246 @@ public class GroupCoordinatorServiceTest {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+            .setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+            ).iterator());
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+            ).iterator());
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @ParameterizedTest
+    @MethodSource("testConsumerGroupHeartbeatWithExceptionSource")
+    public void testDeleteOffsetsWithException(
+        Throwable exception,
+        short expectedErrorCode
+    ) throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+            .setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(expectedErrorCode);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(exception));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-3")
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(Arrays.asList(
+            new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()),
+            result2.duplicate(),
+            result3.duplicate(),
+            result1.duplicate()
+        ));
+
+        when(runtime.partitions()).thenReturn(Sets.newSet(
+            new TopicPartition("__consumer_offsets", 0),
+            new TopicPartition("__consumer_offsets", 1),
+            new TopicPartition("__consumer_offsets", 2)
+        ));
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        CompletableFuture<Object> resultCollectionFuture = new 
CompletableFuture<>();
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(resultCollectionFuture);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+            ArgumentMatchers.any()
+        
)).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
+
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", 
"group-id-3", null);
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        assertFalse(future.isDone());
+        resultCollectionFuture.complete(resultCollection2);
+
+        assertTrue(future.isDone());
+        assertEquals(expectedResultCollection, future.get());
+    }
+
+    @ParameterizedTest
+    @MethodSource("testConsumerGroupHeartbeatWithExceptionSource")
+    public void testDeleteGroupsWithException(
+        Throwable exception,
+        short expectedErrorCode
+    ) throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(exception));
+
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(
+                requestContext(ApiKeys.DELETE_GROUPS),
+                Collections.singletonList("group-id"),
+                BufferSupplier.NO_CACHING
+            );
+
+        assertEquals(
+            new 
DeleteGroupsResponseData.DeletableGroupResultCollection(Collections.singletonList(
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId("group-id")
+                    .setErrorCode(expectedErrorCode)
+            ).iterator()),
+            future.get()
+        );
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index a663147e7e3..1ed931c029d 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -18,10 +18,13 @@ package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
@@ -42,14 +45,22 @@ import 
org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -62,6 +73,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -86,6 +98,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -105,11 +118,127 @@ public class GroupCoordinatorShardTest {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        List<Record> expectedRecords = new ArrayList<>();
+        for (String groupId : groupIds) {
+            expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+            expectedRecords.addAll(Arrays.asList(
+                RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0),
+                RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+            ));
+        }
+
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        when(offsetMetadataManager.deleteAllOffsets(anyString(), 
anyList())).thenAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0));
+            return 1;
+        });
+        // Mockito#when only stubs method returning non-void value, so we use 
Mockito#doAnswer instead.
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+            return null;
+        }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> coordinatorResult =
+            coordinator.deleteGroups(context, groupIds);
+
+        for (String groupId : groupIds) {
+            verify(groupMetadataManager, 
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
+            verify(groupMetadataManager, 
times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
+            verify(offsetMetadataManager, 
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
+        }
+        assertEquals(expectedResult, coordinatorResult);
+    }
+
+    @Test
+    public void testDeleteGroupsInvalidGroupId() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", 
"group-id-3");
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new 
DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId("group-id-1"),
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId("group-id-2")
+                    .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId("group-id-3")
+            ).iterator());
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3")
+        );
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        // Mockito#when only stubs method returning non-void value, so we use 
Mockito#doAnswer and Mockito#doThrow instead.
+        doThrow(Errors.INVALID_GROUP_ID.exception())
+            
.when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.eq("group-id-2"));
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0));
+            return null;
+        }).when(offsetMetadataManager).deleteAllOffsets(anyString(), 
anyList());
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+            return null;
+        }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> coordinatorResult =
+            coordinator.deleteGroups(context, groupIds);
+
+        for (String groupId : groupIds) {
+            verify(groupMetadataManager, 
times(1)).validateDeleteGroup(eq(groupId));
+            if (!groupId.equals("group-id-2")) {
+                verify(groupMetadataManager, 
times(1)).deleteGroup(eq(groupId), anyList());
+                verify(offsetMetadataManager, 
times(1)).deleteAllOffsets(eq(groupId), anyList());
+            }
+        }
+        assertEquals(expectedResult, coordinatorResult);
+    }
+
     @Test
     public void testReplayOffsetCommit() {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -135,6 +264,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -159,6 +289,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -179,6 +310,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -198,6 +330,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -218,6 +351,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -237,6 +371,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -257,6 +392,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -276,6 +412,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -296,6 +433,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -315,6 +453,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -335,6 +474,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -354,6 +494,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -374,6 +515,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -393,6 +535,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -405,6 +548,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -424,6 +568,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -443,6 +588,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
@@ -463,6 +609,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
             groupMetadataManager,
             offsetMetadataManager
         );
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 8159ffd3a94..6bf08f10934 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -738,7 +738,7 @@ public class GroupMetadataManagerTest {
                 request,
                 responseFuture
             );
-            
+
             return new SyncResult(responseFuture, coordinatorResult);
         }
 
@@ -9355,6 +9355,34 @@ public class GroupMetadataManagerTest {
         assertEquals(expectedResponse, leaveResult.response());
     }
 
+    @Test
+    public void testGenericGroupDelete() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        List<Record> expectedRecords = 
Collections.singletonList(RecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
+        List<Record> records = new ArrayList<>();
+        context.groupMetadataManager.deleteGroup("group-id", records);
+        assertEquals(expectedRecords, records);
+    }
+
+    @Test
+    public void testConsumerGroupDelete() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", true);
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"),
+            
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord("group-id"),
+            RecordHelpers.newGroupEpochTombstoneRecord("group-id")
+        );
+        List<Record> records = new ArrayList<>();
+        context.groupMetadataManager.deleteGroup("group-id", records);
+        assertEquals(expectedRecords, records);
+    }
+
     private static void assertNoOrEmptyResult(List<ExpiredTimeout<Void, 
Record>> timeouts) {
         assertTrue(timeouts.size() <= 1);
         timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, 
timeout.result));
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 7f355da76aa..5598a74e50e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
@@ -26,6 +27,8 @@ import 
org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.network.ClientInformation;
@@ -55,8 +58,10 @@ import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -70,6 +75,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class OffsetMetadataManagerTest {
     static class OffsetMetadataManagerTestContext {
@@ -177,6 +183,26 @@ public class OffsetMetadataManagerTest {
             return result;
         }
 
+        public CoordinatorResult<OffsetDeleteResponseData, Record> 
deleteOffsets(
+            OffsetDeleteRequestData request
+        ) {
+            CoordinatorResult<OffsetDeleteResponseData, Record> result = 
offsetMetadataManager.deleteOffsets(request);
+            result.records().forEach(this::replay);
+            return result;
+        }
+
+        public int deleteAllOffsets(
+            String groupId,
+            List<Record> records
+        ) {
+            List<Record> addedRecords = new ArrayList<>();
+            int numDeletedOffsets = 
offsetMetadataManager.deleteAllOffsets(groupId, addedRecords);
+            addedRecords.forEach(this::replay);
+
+            records.addAll(addedRecords);
+            return numDeletedOffsets;
+        }
+
         public List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
fetchOffsets(
             String groupId,
             List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
@@ -257,8 +283,6 @@ public class OffsetMetadataManagerTest {
             long offset,
             int leaderEpoch
         ) {
-            snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
-
             replay(RecordHelpers.newOffsetCommitRecord(
                 groupId,
                 topic,
@@ -274,6 +298,18 @@ public class OffsetMetadataManagerTest {
             ));
         }
 
+        public void deleteOffset(
+            String groupId,
+            String topic,
+            int partition
+        ) {
+            replay(RecordHelpers.newOffsetCommitTombstoneRecord(
+                groupId,
+                topic,
+                partition
+            ));
+        }
+
         private ApiMessage messageOrNull(ApiMessageAndVersion 
apiMessageAndVersion) {
             if (apiMessageAndVersion == null) {
                 return null;
@@ -285,6 +321,8 @@ public class OffsetMetadataManagerTest {
         private void replay(
             Record record
         ) {
+            snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+
             ApiMessageAndVersion key = record.key();
             ApiMessageAndVersion value = record.value();
 
@@ -307,6 +345,63 @@ public class OffsetMetadataManagerTest {
 
             lastWrittenOffset++;
         }
+
+        public void testOffsetDeleteWith(
+            String groupId,
+            String topic,
+            int partition,
+            Errors expectedError
+        ) {
+            final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+                new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                    new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                        .setName(topic)
+                        .setPartitions(Collections.singletonList(
+                            new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
+                        ))
+                ).iterator());
+
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
expectedResponsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            if (hasOffset(groupId, topic, partition)) {
+                expectedResponsePartitionCollection.add(
+                    new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setErrorCode(expectedError.code())
+                );
+            }
+
+            final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
expectedResponseTopicCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+                    new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+                        .setName(topic)
+                        .setPartitions(expectedResponsePartitionCollection)
+                ).iterator());
+
+            List<Record> expectedRecords = Collections.emptyList();
+            if (hasOffset(groupId, topic, partition) && expectedError == 
Errors.NONE) {
+                expectedRecords = Collections.singletonList(
+                    RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
topic, partition)
+                );
+            }
+
+            final CoordinatorResult<OffsetDeleteResponseData, Record> 
coordinatorResult = deleteOffsets(
+                new OffsetDeleteRequestData()
+                    .setGroupId(groupId)
+                    .setTopics(requestTopicCollection)
+            );
+
+            assertEquals(new 
OffsetDeleteResponseData().setTopics(expectedResponseTopicCollection), 
coordinatorResult.response());
+            assertEquals(expectedRecords, coordinatorResult.records());
+        }
+
+        public boolean hasOffset(
+            String groupId,
+            String topic,
+            int partition
+        ) {
+            return offsetMetadataManager.offset(groupId, topic, partition) != 
null;
+        }
     }
 
     @ParameterizedTest
@@ -1561,6 +1656,113 @@ public class OffsetMetadataManagerTest {
             () -> context.fetchAllOffsets("group", "member", 10, 
Long.MAX_VALUE));
     }
 
+    @Test
+    public void testGenericGroupOffsetDelete() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "foo",
+            true
+        );
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+        group.setSubscribedTopics(Optional.of(Collections.emptySet()));
+        context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE);
+    }
+
+    @Test
+    public void testGenericGroupOffsetDeleteWithErrors() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "foo",
+            true
+        );
+        group.setSubscribedTopics(Optional.of(Collections.singleton("bar")));
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+
+        // Delete the offset whose topic partition doesn't exist.
+        context.testOffsetDeleteWith("foo", "bar1", 0, Errors.NONE);
+        // Delete the offset from the topic that the group is subscribed to.
+        context.testOffsetDeleteWith("foo", "bar", 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+    }
+
+    @Test
+    public void testConsumerGroupOffsetDelete() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+            "foo",
+            true
+        );
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+        assertFalse(group.isSubscribedToTopic("bar"));
+        context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE);
+    }
+
+    @Test
+    public void testConsumerGroupOffsetDeleteWithErrors() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+            "foo",
+            true
+        );
+        MetadataImage image = new 
GroupMetadataManagerTest.MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), "foo", 1)
+            .addRacks()
+            .build();
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member1")
+            .setSubscribedTopicNames(Collections.singletonList("bar"))
+            .build();
+        group.computeSubscriptionMetadata(
+            null,
+            member1,
+            image.topics(),
+            image.cluster()
+        );
+        group.updateMember(member1);
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+        assertTrue(group.isSubscribedToTopic("bar"));
+
+        // Delete the offset whose topic partition doesn't exist.
+        context.testOffsetDeleteWith("foo", "bar1", 0, Errors.NONE);
+        // Delete the offset from the topic that the group is subscribed to.
+        context.testOffsetDeleteWith("foo", "bar", 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+    }
+
+    @ParameterizedTest
+    @EnumSource(Group.GroupType.class)
+    public void testDeleteGroupAllOffsets(Group.GroupType groupType) {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        switch (groupType) {
+            case GENERIC:
+                context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+                    "foo",
+                    true
+                );
+                break;
+            case CONSUMER:
+                context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+                    "foo",
+                    true
+                );
+                break;
+            default:
+                throw new IllegalArgumentException("Invalid group type: " + 
groupType);
+        }
+        context.commitOffset("foo", "bar-0", 0, 100L, 0);
+        context.commitOffset("foo", "bar-0", 1, 100L, 0);
+        context.commitOffset("foo", "bar-1", 0, 100L, 0);
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-1", 0),
+            RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 0),
+            RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 1)
+        );
+
+        List<Record> records = new ArrayList<>();
+        int numDeleteOffsets = context.deleteAllOffsets("foo", records);
+
+        assertEquals(expectedRecords, records);
+        assertEquals(3, numDeleteOffsets);
+    }
+
     static private OffsetFetchResponseData.OffsetFetchResponsePartitions 
mkOffsetPartitionResponse(
         int partition,
         long offset,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index 9c9421958c5..210aa8eb901 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group.consumer;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.StaleMemberEpochException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.utils.LogContext;
@@ -35,6 +36,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
 import static 
org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -673,4 +675,32 @@ public class ConsumerGroupTest {
         // This should succeed.
         group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE);
     }
+
+    @Test
+    public void testValidateDeleteGroup() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+        assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, 
consumerGroup.state());
+        assertDoesNotThrow(consumerGroup::validateDeleteGroup);
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member1")
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(1)
+            .build();
+        consumerGroup.updateMember(member1);
+
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, 
consumerGroup.state());
+        assertThrows(GroupNotEmptyException.class, 
consumerGroup::validateDeleteGroup);
+
+        consumerGroup.setGroupEpoch(1);
+
+        assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, 
consumerGroup.state());
+        assertThrows(GroupNotEmptyException.class, 
consumerGroup::validateDeleteGroup);
+
+        consumerGroup.setTargetAssignmentEpoch(1);
+
+        assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, 
consumerGroup.state());
+        assertThrows(GroupNotEmptyException.class, 
consumerGroup::validateDeleteGroup);
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
index ba4b177f676..05afdd26edf 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
@@ -20,6 +20,8 @@ import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -45,6 +47,7 @@ import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
 import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
 import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
 import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -776,7 +779,7 @@ public class GenericGroupTest {
             protocolType,
             protocols
         );
-        
+
         group.add(member);
         assertTrue(group.hasMemberId(memberId));
         assertTrue(group.hasStaticMember(groupInstanceId));
@@ -1026,6 +1029,62 @@ public class GenericGroupTest {
             () -> group.validateOffsetCommit("member-id", "new-instance-id", 
1));
     }
 
+    @Test
+    public void testValidateOffsetDelete() {
+        assertFalse(group.usesConsumerGroupProtocol());
+        group.transitionTo(PREPARING_REBALANCE);
+        assertThrows(GroupNotEmptyException.class, 
group::validateOffsetDelete);
+        group.transitionTo(COMPLETING_REBALANCE);
+        assertThrows(GroupNotEmptyException.class, 
group::validateOffsetDelete);
+        group.transitionTo(STABLE);
+        assertThrows(GroupNotEmptyException.class, 
group::validateOffsetDelete);
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("roundrobin")
+            .setMetadata(new byte[0]));
+        GenericGroupMember member = new GenericGroupMember(
+            memberId,
+            Optional.of(groupInstanceId),
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            protocolType,
+            protocols
+        );
+        group.add(member);
+
+        assertTrue(group.usesConsumerGroupProtocol());
+        group.transitionTo(PREPARING_REBALANCE);
+        assertDoesNotThrow(group::validateOffsetDelete);
+        group.transitionTo(COMPLETING_REBALANCE);
+        assertDoesNotThrow(group::validateOffsetDelete);
+        group.transitionTo(STABLE);
+        assertDoesNotThrow(group::validateOffsetDelete);
+
+        group.transitionTo(PREPARING_REBALANCE);
+        group.transitionTo(EMPTY);
+        assertDoesNotThrow(group::validateOffsetDelete);
+        group.transitionTo(DEAD);
+        assertThrows(GroupIdNotFoundException.class, 
group::validateOffsetDelete);
+    }
+
+    @Test
+    public void testValidateDeleteGroup() {
+        group.transitionTo(PREPARING_REBALANCE);
+        assertThrows(GroupNotEmptyException.class, group::validateDeleteGroup);
+        group.transitionTo(COMPLETING_REBALANCE);
+        assertThrows(GroupNotEmptyException.class, group::validateDeleteGroup);
+        group.transitionTo(STABLE);
+        assertThrows(GroupNotEmptyException.class, group::validateDeleteGroup);
+        group.transitionTo(PREPARING_REBALANCE);
+        group.transitionTo(EMPTY);
+        assertDoesNotThrow(group::validateDeleteGroup);
+        group.transitionTo(DEAD);
+        assertThrows(GroupIdNotFoundException.class, 
group::validateDeleteGroup);
+    }
+
     private void assertState(GenericGroup group, GenericGroupState 
targetState) {
         Set<GenericGroupState> otherStates = new HashSet<>();
         otherStates.add(STABLE);


Reply via email to