jeffkbkim commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1343258721


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3071,33 @@ private void removeCurrentMemberFromGenericGroup(
         group.remove(member.memberId());
     }
 
+    /**
+     * Handles a DeleteGroups request.
+     * Populates the record list passed in with record to update the state 
machine.
+     * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
+     * calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+     *
+     * @param groupId The ID of the group to be deleted. It has been checked 
in {@link GroupMetadataManager#validateDeleteGroup}.
+     * @param records The record list to populate.
+     */
+    public void deleteGroup(
+        String groupId,
+        List<Record> records
+    ) {
+        // In this method, we only populate records with tombstone records, so 
we don't expect an exception to be thrown here.

Review Comment:
   "At this point, we have already validated the group id so we know that the 
group exists and that no exception will be thrown."
   
   how's this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -341,6 +384,22 @@ public CoordinatorResult<LeaveGroupResponseData, Record> 
genericGroupLeave(
         return groupMetadataManager.genericGroupLeave(context, request);
     }
 
+    /**
+     * Handles a OffsetDelete request.

Review Comment:
   nit: an



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +349,94 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =

Review Comment:
   what's the benefit of using final variables here?



##########
clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static 
org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DeleteGroupsRequestTest {
+
+    protected static String groupId1 = "group-id-1";
+    protected static String groupId2 = "group-id-2";

Review Comment:
   we can move these into the test as well



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3071,33 @@ private void removeCurrentMemberFromGenericGroup(
         group.remove(member.memberId());
     }
 
+    /**
+     * Handles a DeleteGroups request.
+     * Populates the record list passed in with record to update the state 
machine.
+     * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
+     * calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+     *
+     * @param groupId The ID of the group to be deleted. It has been checked 
in {@link GroupMetadataManager#validateDeleteGroup}.

Review Comment:
   nit: can we change all usages of "ID" to "id"?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,253 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+            .setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+            ).iterator());
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+            ).iterator());
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    private static Stream<Arguments> testDeleteOffsetsWithExceptionSource() {
+        return testConsumerGroupHeartbeatWithExceptionSource();
+    }
+
+    @ParameterizedTest
+    @MethodSource("testDeleteOffsetsWithExceptionSource")
+    public void testDeleteOffsetsWithException(
+        Throwable exception,
+        short expectedErrorCode
+    ) throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+            .setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(expectedErrorCode);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(exception));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-3")
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(Arrays.asList(
+            new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()),
+            result2.duplicate(),
+            result3.duplicate(),
+            result1.duplicate()
+        ));
+
+        when(runtime.partitions()).thenReturn(Sets.newSet(
+            new TopicPartition("__consumer_offsets", 0),
+            new TopicPartition("__consumer_offsets", 1),
+            new TopicPartition("__consumer_offsets", 2)
+        ));
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        CompletableFuture<Object> resultCollectionFuture = new 
CompletableFuture<>();
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(resultCollectionFuture);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+            ArgumentMatchers.any()
+        
)).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
+
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", 
"group-id-3", null);
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        assertFalse(future.isDone());
+        resultCollectionFuture.complete(resultCollection2);
+

Review Comment:
   can we assert true that the future is done?



##########
clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static 
org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DeleteGroupsRequestTest {
+
+    protected static String groupId1 = "group-id-1";
+    protected static String groupId2 = "group-id-2";
+
+    private static DeleteGroupsRequestData data;
+
+    @BeforeEach
+    public void setUp() {
+        data = new DeleteGroupsRequestData()
+            .setGroupsNames(Arrays.asList(groupId1, groupId2));
+    }

Review Comment:
   was this addressed?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,253 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+            .setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+            ).iterator());
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+            ).iterator());
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    private static Stream<Arguments> testDeleteOffsetsWithExceptionSource() {
+        return testConsumerGroupHeartbeatWithExceptionSource();
+    }
+
+    @ParameterizedTest
+    @MethodSource("testDeleteOffsetsWithExceptionSource")
+    public void testDeleteOffsetsWithException(
+        Throwable exception,
+        short expectedErrorCode
+    ) throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+            .setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(expectedErrorCode);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(exception));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-3")
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(Arrays.asList(
+            new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()),
+            result2.duplicate(),
+            result3.duplicate(),
+            result1.duplicate()
+        ));
+
+        when(runtime.partitions()).thenReturn(Sets.newSet(
+            new TopicPartition("__consumer_offsets", 0),
+            new TopicPartition("__consumer_offsets", 1),
+            new TopicPartition("__consumer_offsets", 2)
+        ));
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        CompletableFuture<Object> resultCollectionFuture = new 
CompletableFuture<>();
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(resultCollectionFuture);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+            ArgumentMatchers.any()
+        
)).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
+
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", 
"group-id-3", null);
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        assertFalse(future.isDone());
+        resultCollectionFuture.complete(resultCollection2);
+
+        assertEquals(expectedResultCollection, future.get());
+    }
+
+    private static Stream<Arguments> testDeleteGroupsWithExceptionSource() {
+        return testConsumerGroupHeartbeatWithExceptionSource();
+    }
+
+    @ParameterizedTest
+    @MethodSource("testDeleteGroupsWithExceptionSource")

Review Comment:
   can we use
   ```
       @MethodSource("testConsumerGroupHeartbeatWithExceptionSource")
   ```
   
   and remove the helper?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +507,50 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final 
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
 futures =
+            new ArrayList<>(groupIds.size());
+
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            // For backwards compatibility, we support DeleteGroups for the 
empty group id.
+            if (groupId == null) {
+                
futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(
+                    Collections.singletonList(null),
+                    Errors.INVALID_GROUP_ID
+                )));
+            } else {
+                final TopicPartition topicPartition = 
topicPartitionFor(groupId);
+                groupsByTopicPartition
+                    .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+                    .add(groupId);
+            }
+        });
+
+        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+                runtime.scheduleWriteOperation(
+                    "delete-groups",
+                    topicPartition,
+                    coordinator -> coordinator.deleteGroups(context, groupList)
+                ).exceptionally(exception ->
+                    DeleteGroupsRequest.getErrorResultCollection(groupList, 
getErrorsForException(exception))
+                );
+
+            futures.add(future);
+        });
+
+        final CompletableFuture<Void> allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+        return allFutures.thenApply(v -> {

Review Comment:
   we can remove the "v"



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -90,4 +92,28 @@ void validateOffsetFetch(
         int memberEpoch,
         long lastCommittedOffset
     ) throws KafkaException;
+
+    /**
+     * Validates the OffsetDelete request.
+     */
+    void validateOffsetDelete() throws KafkaException;
+
+    /**
+     * Validates the DeleteGroups request.
+     */
+    void validateDeleteGroup() throws KafkaException;
+
+    /**
+     * Returns true if the group is actively subscribed to the topic.
+     *
+     * @param topic The topic name.
+     * @return Whether the group is subscribed to the topic.
+     */
+    boolean isSubscribedToTopic(String topic);
+    /**

Review Comment:
   nit: newline



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,84 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final 
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
 futures =
+            new ArrayList<>(groupIds.size());
+
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            // For backwards compatibility, we support DeleteGroups for the 
empty group id.
+            if (groupId == null) {
+                
futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(
+                    Collections.singletonList(null),
+                    Errors.INVALID_GROUP_ID
+                )));
+            } else {
+                final TopicPartition topicPartition = 
topicPartitionFor(groupId);
+                groupsByTopicPartition
+                    .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+                    .add(groupId);
+            }
+        });
+
+        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+                runtime.scheduleWriteOperation(
+                    "delete-groups",
+                    topicPartition,
+                    coordinator -> coordinator.deleteGroups(context, groupList)
+                ).exceptionally(exception -> {
+                    if (exception instanceof UnknownTopicOrPartitionException 
||
+                        exception instanceof NotEnoughReplicasException) {
+                        return DeleteGroupsRequest.getErrorResultCollection(
+                            groupList,
+                            Errors.COORDINATOR_NOT_AVAILABLE
+                        );
+                    }
+
+                    if (exception instanceof NotLeaderOrFollowerException ||
+                        exception instanceof KafkaStorageException) {
+                        return DeleteGroupsRequest.getErrorResultCollection(
+                            groupList,
+                            Errors.NOT_COORDINATOR
+                        );
+                    }
+
+                    if (exception instanceof RecordTooLargeException ||
+                        exception instanceof RecordBatchTooLargeException ||
+                        exception instanceof InvalidFetchSizeException) {
+                        return DeleteGroupsRequest.getErrorResultCollection(
+                            groupList,
+                            Errors.UNKNOWN_SERVER_ERROR
+                        );
+                    }
+
+                    return DeleteGroupsRequest.getErrorResultCollection(
+                        groupList,
+                        Errors.forException(exception)
+                    );
+                });
+
+            futures.add(future);
+        });
+
+        final CompletableFuture<Void> allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+        final 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
resFuture = allFutures.thenApply(v -> {

Review Comment:
   also, we can remove the `v`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +349,94 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+
+            if (group.isSubscribedToTopic(topic.name())) {
+                topic.partitions().forEach(partition ->
+                    responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
+                    )
+                );
+            } else {
+                final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic == null ?
+                    null : offsetsByTopic.get(topic.name());
+                if (offsetsByPartition != null) {
+                    topic.partitions().forEach(partition -> {
+                        if 
(offsetsByPartition.containsKey(partition.partitionIndex())) {
+                            responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                                .setPartitionIndex(partition.partitionIndex())
+                            );
+                            
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+                                request.groupId(),
+                                topic.name(),
+                                partition.partitionIndex()
+                            ));
+                        }
+                    });
+                }
+            }
+
+            responseTopicCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+                .setName(topic.name())
+                .setPartitions(responsePartitionCollection)
+            );
+        });
+
+        return new CoordinatorResult<>(
+            records,
+            new OffsetDeleteResponseData().setTopics(responseTopicCollection)
+        );
+    }
+
+    /**
+     * Deletes offsets as part of a DeleteGroups request.
+     * Populates the record list passed in with records to update the state 
machine.
+     * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)}
+     *
+     * @param groupId The ID of the given group.
+     * @param records The record list to populate.
+     *
+     * @return The number of offsets to be deleted.
+     */
+    public int deleteAllOffsets(
+        String groupId,
+        List<Record> records
+    ) {
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic = offsetsByGroup.get(groupId);
+        AtomicInteger numDeletedOffsets = new AtomicInteger();
+
+        if (offsetsByTopic != null) {
+            offsetsByTopic.forEach((topic, offsetsByPartition) ->

Review Comment:
   we can use
   ```
                   offsetsByPartition.keySet().forEach(partition ->
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -592,6 +607,33 @@ public void validateOffsetFetch(
         validateMemberEpoch(memberEpoch, member.memberEpoch());
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() {}
+
+    /**
+     * Validates the DeleteGroups request.
+     */
+    @Override
+    public void validateDeleteGroup() throws ApiException {
+        if (state() != ConsumerGroupState.EMPTY) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }
+    }
+
+    /**
+     * Populates the list of records with tombstone(s) for deleting the group.
+     *
+     * @param records The list of records.
+     */
+    public void createGroupTombstoneRecords(List<Record> records) {

Review Comment:
   @Override?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,11 +119,126 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        List<Record> expectedRecords = new ArrayList<>();
+        for (String groupId : groupIds) {
+            expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+            expectedRecords.addAll(Arrays.asList(
+                RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0),
+                RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+            ));
+        }
+
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());

Review Comment:
   can we change all of the `doSomething...when...` to `when().doSomething`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,253 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+            .setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+            ).iterator());
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+            ).iterator());
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(
+                        new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                    ))
+            ).iterator());
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    private static Stream<Arguments> testDeleteOffsetsWithExceptionSource() {
+        return testConsumerGroupHeartbeatWithExceptionSource();
+    }
+
+    @ParameterizedTest
+    @MethodSource("testDeleteOffsetsWithExceptionSource")

Review Comment:
   can we use
   ```
       @MethodSource("testConsumerGroupHeartbeatWithExceptionSource")
   ```
   and remove the helper method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to