dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1342599402


##########
clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static 
org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DeleteGroupsRequestTest {
+
+    protected static String groupId1 = "group-id-1";
+    protected static String groupId2 = "group-id-2";
+
+    private static DeleteGroupsRequestData data;
+
+    @BeforeEach
+    public void setUp() {
+        data = new DeleteGroupsRequestData()
+            .setGroupsNames(Arrays.asList(groupId1, groupId2));
+    }

Review Comment:
   nit: Given that there is only one test. I would rather move everything into 
that test.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -307,6 +344,57 @@ private void replay(
 
             lastWrittenOffset++;
         }
+
+        public void testOffsetDeleteWith(
+            OffsetMetadataManagerTestContext context,
+            String groupId,
+            String topic,
+            int partition,
+            Errors error

Review Comment:
   nit: expectedError?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,87 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic == null ?
+                null : offsetsByTopic.get(topic.name());
+
+            if (group.isSubscribedToTopic(topic.name())) {
+                topic.partitions().forEach(partition ->
+                    responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
+                    )
+                );
+            } else {
+                topic.partitions().forEach(partition -> {
+                    if (offsetsByPartition != null && 
offsetsByPartition.containsKey(partition.partitionIndex())) {
+                        responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                            .setPartitionIndex(partition.partitionIndex())
+                        );
+                        
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+                            request.groupId(),
+                            topic.name(),
+                            partition.partitionIndex()
+                        ));
+                    }
+                });
+            }
+
+            final OffsetDeleteResponseData.OffsetDeleteResponseTopic 
responseTopic =
+                new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+                    .setName(topic.name())
+                    .setPartitions(responsePartitionCollection);
+            responseTopicCollection.add(responseTopic);
+        });
+        response.setTopics(responseTopicCollection);
+
+        return new CoordinatorResult<>(records, response);
+    }
+
+    /**
+     * Deletes offsets as part of a DeleteGroups request.
+     * Populates the record list passed in with records to update the state 
machine.
+     * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)}
+     *
+     * @param groupId The ID of the given group.
+     * @param records The record list to populate.
+     */
+    public void deleteAllOffsets(
+        String groupId,
+        List<Record> records
+    ) {
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic = offsetsByGroup.get(groupId);
+
+        if (offsetsByTopic != null) {

Review Comment:
   No, we don't need it here because the group is completely removed in this 
case.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,84 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final 
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
 futures =
+            new ArrayList<>(groupIds.size());
+
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            // For backwards compatibility, we support DeleteGroups for the 
empty group id.
+            if (groupId == null) {
+                
futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(
+                    Collections.singletonList(null),
+                    Errors.INVALID_GROUP_ID
+                )));
+            } else {
+                final TopicPartition topicPartition = 
topicPartitionFor(groupId);
+                groupsByTopicPartition
+                    .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+                    .add(groupId);
+            }
+        });
+
+        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+                runtime.scheduleWriteOperation(
+                    "delete-groups",
+                    topicPartition,
+                    coordinator -> coordinator.deleteGroups(context, groupList)
+                ).exceptionally(exception -> {
+                    if (exception instanceof UnknownTopicOrPartitionException 
||
+                        exception instanceof NotEnoughReplicasException) {
+                        return DeleteGroupsRequest.getErrorResultCollection(
+                            groupList,
+                            Errors.COORDINATOR_NOT_AVAILABLE
+                        );
+                    }
+
+                    if (exception instanceof NotLeaderOrFollowerException ||
+                        exception instanceof KafkaStorageException) {
+                        return DeleteGroupsRequest.getErrorResultCollection(
+                            groupList,
+                            Errors.NOT_COORDINATOR
+                        );
+                    }
+
+                    if (exception instanceof RecordTooLargeException ||
+                        exception instanceof RecordBatchTooLargeException ||
+                        exception instanceof InvalidFetchSizeException) {
+                        return DeleteGroupsRequest.getErrorResultCollection(
+                            groupList,
+                            Errors.UNKNOWN_SERVER_ERROR
+                        );
+                    }
+
+                    return DeleteGroupsRequest.getErrorResultCollection(
+                        groupList,
+                        Errors.forException(exception)
+                    );
+                });
+
+            futures.add(future);
+        });
+
+        final CompletableFuture<Void> allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+        final 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
resFuture = allFutures.thenApply(v -> {

Review Comment:
   nit: How about `return allFutures.thenApply...`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,84 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final 
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
 futures =
+            new ArrayList<>(groupIds.size());
+
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            // For backwards compatibility, we support DeleteGroups for the 
empty group id.
+            if (groupId == null) {
+                
futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(
+                    Collections.singletonList(null),
+                    Errors.INVALID_GROUP_ID
+                )));
+            } else {
+                final TopicPartition topicPartition = 
topicPartitionFor(groupId);
+                groupsByTopicPartition
+                    .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+                    .add(groupId);
+            }
+        });
+
+        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+                runtime.scheduleWriteOperation(
+                    "delete-groups",
+                    topicPartition,
+                    coordinator -> coordinator.deleteGroups(context, groupList)
+                ).exceptionally(exception -> {
+                    if (exception instanceof UnknownTopicOrPartitionException 
||
+                        exception instanceof NotEnoughReplicasException) {
+                        return DeleteGroupsRequest.getErrorResultCollection(
+                            groupList,
+                            Errors.COORDINATOR_NOT_AVAILABLE
+                        );
+                    }
+
+                    if (exception instanceof NotLeaderOrFollowerException ||
+                        exception instanceof KafkaStorageException) {
+                        return DeleteGroupsRequest.getErrorResultCollection(
+                            groupList,
+                            Errors.NOT_COORDINATOR
+                        );
+                    }
+
+                    if (exception instanceof RecordTooLargeException ||
+                        exception instanceof RecordBatchTooLargeException ||
+                        exception instanceof InvalidFetchSizeException) {
+                        return DeleteGroupsRequest.getErrorResultCollection(
+                            groupList,
+                            Errors.UNKNOWN_SERVER_ERROR
+                        );
+                    }
+
+                    return DeleteGroupsRequest.getErrorResultCollection(
+                        groupList,
+                        Errors.forException(exception)
+                    );
+                });
+
+            futures.add(future);
+        });
+
+        final CompletableFuture<Void> allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+        final 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
resFuture = allFutures.thenApply(v -> {
+            final DeleteGroupsResponseData.DeletableGroupResultCollection res 
= new DeleteGroupsResponseData.DeletableGroupResultCollection();
+            final List<String> deletedGroups = new ArrayList<>();
+            futures.forEach(future ->
+                // We don't use res.addAll(future.join()) because 
DeletableGroupResultCollection is an ImplicitLinkedHashMultiCollection,
+                // which has requirements for adding elements (see 
ImplicitLinkedHashCollection.java#add).
+                future.join().forEach(result -> {
+                    res.add(result.duplicate());
+                    if (result.errorCode() == Errors.NONE.code()) {
+                        deletedGroups.add(result.groupId());
+                    }
+                })
+            );
+            log.info("The following groups were deleted: {}.", String.join(", 
", deletedGroups));

Review Comment:
   I wonder if we should rather log this within the shard in order to have it 
logged per shard (with the shard context). What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,87 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic == null ?
+                null : offsetsByTopic.get(topic.name());
+
+            if (group.isSubscribedToTopic(topic.name())) {
+                topic.partitions().forEach(partition ->
+                    responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
+                    )
+                );
+            } else {
+                topic.partitions().forEach(partition -> {
+                    if (offsetsByPartition != null && 
offsetsByPartition.containsKey(partition.partitionIndex())) {
+                        responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                            .setPartitionIndex(partition.partitionIndex())
+                        );
+                        
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+                            request.groupId(),
+                            topic.name(),
+                            partition.partitionIndex()
+                        ));
+                    }
+                });
+            }
+
+            final OffsetDeleteResponseData.OffsetDeleteResponseTopic 
responseTopic =
+                new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+                    .setName(topic.name())
+                    .setPartitions(responsePartitionCollection);
+            responseTopicCollection.add(responseTopic);
+        });
+        response.setTopics(responseTopicCollection);
+
+        return new CoordinatorResult<>(records, response);

Review Comment:
   nit: Would it make sense to use? We don't really need `response` except here.
   
   ```
   return new CoordinatorResult<>(
       records,
       new OffsetDeleteResponseData().setTopics(responseTopicCollection)
   );



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -307,6 +344,57 @@ private void replay(
 
             lastWrittenOffset++;
         }
+
+        public void testOffsetDeleteWith(
+            OffsetMetadataManagerTestContext context,
+            String groupId,
+            String topic,
+            int partition,
+            Errors error
+        ) {
+            final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+                new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                    new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                        .setName(topic)
+                        .setPartitions(Collections.singletonList(
+                            new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
+                        ))
+                ).iterator());
+
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
expectedResponsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            if (context.offsetMetadataManager.offset(groupId, topic, 
partition) != null) {
+                expectedResponsePartitionCollection.add(
+                    new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setErrorCode(error.code())
+                );
+            }
+
+            final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
expectedResponseTopicCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+                    new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+                        .setName(topic)
+                        .setPartitions(expectedResponsePartitionCollection)
+                ).iterator());
+
+            List<Record> expectedRecords = Collections.emptyList();
+            if (context.offsetMetadataManager.offset(groupId, topic, 
partition) != null &&

Review Comment:
   nit: Should we define an helper method in the context (e.g. 
hasOffset(groupId, topic, partition))? I would also bring `error == 
Errors.NONE` back on the previous line because it fits there.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -592,6 +607,37 @@ public void validateOffsetFetch(
         validateMemberEpoch(memberEpoch, member.memberEpoch());
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() {}
+
+    /**
+     * Validates the DeleteGroups request.
+     */
+    @Override
+    public void validateDeleteGroup() throws ApiException {
+        switch (state()) {
+            case STABLE:
+            case ASSIGNING:
+            case RECONCILING:
+                throw Errors.NON_EMPTY_GROUP.exception();
+            default:
+        }

Review Comment:
   nit: Could we use `state() != Empty`? This would be more robust.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,207 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+            .setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+            ).iterator());
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+            ).iterator());
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsCoordinatorNotAvailableException() throws 
Exception {

Review Comment:
   I wonder if we should test all the exceptions that we re-map. I did 
something similar in `testConsumerGroupHeartbeatWithException`. What do you 
think?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,207 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+            .setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+            ).iterator());
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+            ).iterator());
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsCoordinatorNotAvailableException() throws 
Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+            .setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()
+        ));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-3")
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(Arrays.asList(
+            new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()),
+            result1.duplicate(),
+            result2.duplicate(),
+            result3.duplicate()
+        ));
+
+        when(runtime.partitions()).thenReturn(Sets.newSet(
+            new TopicPartition("__consumer_offsets", 0),
+            new TopicPartition("__consumer_offsets", 1),
+            new TopicPartition("__consumer_offsets", 2)
+        ));
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        CompletableFuture<Object> resultCollectionFuture = new 
CompletableFuture<>();
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(resultCollectionFuture);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+            ArgumentMatchers.any()
+        
)).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
+
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", 
"group-id-3", null);
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        assertFalse(future.isDone());
+        resultCollectionFuture.complete(resultCollection2);
+
+        assertTrue(expectedResultCollection.containsAll(future.get()));
+        assertTrue(future.get().containsAll(expectedResultCollection));

Review Comment:
   Can't we use `assertEquals`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,207 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+            .setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+            ).iterator());
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+            ).iterator());
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsCoordinatorNotAvailableException() throws 
Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+            .setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()
+        ));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteGroups() throws Exception {

Review Comment:
   ditto about the error mapping verification.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +115,107 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2"));
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-2", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-2")
+        );
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+        doAnswer(invocation -> {

Review Comment:
   It would be better to use the other way in order to remain consistent with 
the other tests. Is this possible?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,87 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic == null ?
+                null : offsetsByTopic.get(topic.name());
+
+            if (group.isSubscribedToTopic(topic.name())) {
+                topic.partitions().forEach(partition ->
+                    responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
+                    )
+                );
+            } else {
+                topic.partitions().forEach(partition -> {

Review Comment:
   I am +1 for bringing the definition of `offsetsByPartition` within the 
`else` clause. However, we have to keep using `topic.partitions().forEach(` to 
iterate over the partitions. However, I don't like `if(offsetsByPartition 
==null) {continue};`. How about using `offsetsByPartition != null`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to