dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1342599402
########## clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.DeleteGroupsRequestData; +import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class DeleteGroupsRequestTest { + + protected static String groupId1 = "group-id-1"; + protected static String groupId2 = "group-id-2"; + + private static DeleteGroupsRequestData data; + + @BeforeEach + public void setUp() { + data = new DeleteGroupsRequestData() + .setGroupsNames(Arrays.asList(groupId1, groupId2)); + } Review Comment: nit: Given that there is only one test. I would rather move everything into that test. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ########## @@ -307,6 +344,57 @@ private void replay( lastWrittenOffset++; } + + public void testOffsetDeleteWith( + OffsetMetadataManagerTestContext context, + String groupId, + String topic, + int partition, + Errors error Review Comment: nit: expectedError? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -333,6 +348,87 @@ public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( return new CoordinatorResult<>(records, response); } + /** + * Handles an OffsetDelete request. + * + * @param request The OffsetDelete request. + * + * @return A Result containing the OffsetDeleteResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( + OffsetDeleteRequestData request + ) throws ApiException { + final Group group = validateOffsetDelete(request); + final List<Record> records = new ArrayList<>(); + final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); + final OffsetDeleteResponseData response = new OffsetDeleteResponseData(); + final TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = + offsetsByGroup.get(request.groupId()); + + request.topics().forEach(topic -> { + final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); + final TimelineHashMap<Integer, OffsetAndMetadata> offsetsByPartition = offsetsByTopic == null ? + null : offsetsByTopic.get(topic.name()); + + if (group.isSubscribedToTopic(topic.name())) { + topic.partitions().forEach(partition -> + responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code()) + ) + ); + } else { + topic.partitions().forEach(partition -> { + if (offsetsByPartition != null && offsetsByPartition.containsKey(partition.partitionIndex())) { + responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + ); + records.add(RecordHelpers.newOffsetCommitTombstoneRecord( + request.groupId(), + topic.name(), + partition.partitionIndex() + )); + } + }); + } + + final OffsetDeleteResponseData.OffsetDeleteResponseTopic responseTopic = + new OffsetDeleteResponseData.OffsetDeleteResponseTopic() + .setName(topic.name()) + .setPartitions(responsePartitionCollection); + responseTopicCollection.add(responseTopic); + }); + response.setTopics(responseTopicCollection); + + return new CoordinatorResult<>(records, response); + } + + /** + * Deletes offsets as part of a DeleteGroups request. + * Populates the record list passed in with records to update the state machine. + * Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)} + * + * @param groupId The ID of the given group. + * @param records The record list to populate. + */ + public void deleteAllOffsets( + String groupId, + List<Record> records + ) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsetsByGroup.get(groupId); + + if (offsetsByTopic != null) { Review Comment: No, we don't need it here because the group is completely removed in this case. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -523,9 +526,84 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + final List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>> futures = + new ArrayList<>(groupIds.size()); + + final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>(); + groupIds.forEach(groupId -> { + // For backwards compatibility, we support DeleteGroups for the empty group id. + if (groupId == null) { + futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection( + Collections.singletonList(null), + Errors.INVALID_GROUP_ID + ))); + } else { + final TopicPartition topicPartition = topicPartitionFor(groupId); + groupsByTopicPartition + .computeIfAbsent(topicPartition, __ -> new ArrayList<>()) + .add(groupId); + } + }); + + groupsByTopicPartition.forEach((topicPartition, groupList) -> { + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = + runtime.scheduleWriteOperation( + "delete-groups", + topicPartition, + coordinator -> coordinator.deleteGroups(context, groupList) + ).exceptionally(exception -> { + if (exception instanceof UnknownTopicOrPartitionException || + exception instanceof NotEnoughReplicasException) { + return DeleteGroupsRequest.getErrorResultCollection( + groupList, + Errors.COORDINATOR_NOT_AVAILABLE + ); + } + + if (exception instanceof NotLeaderOrFollowerException || + exception instanceof KafkaStorageException) { + return DeleteGroupsRequest.getErrorResultCollection( + groupList, + Errors.NOT_COORDINATOR + ); + } + + if (exception instanceof RecordTooLargeException || + exception instanceof RecordBatchTooLargeException || + exception instanceof InvalidFetchSizeException) { + return DeleteGroupsRequest.getErrorResultCollection( + groupList, + Errors.UNKNOWN_SERVER_ERROR + ); + } + + return DeleteGroupsRequest.getErrorResultCollection( + groupList, + Errors.forException(exception) + ); + }); + + futures.add(future); + }); + + final CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + final CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> resFuture = allFutures.thenApply(v -> { Review Comment: nit: How about `return allFutures.thenApply...`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -523,9 +526,84 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + final List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>> futures = + new ArrayList<>(groupIds.size()); + + final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>(); + groupIds.forEach(groupId -> { + // For backwards compatibility, we support DeleteGroups for the empty group id. + if (groupId == null) { + futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection( + Collections.singletonList(null), + Errors.INVALID_GROUP_ID + ))); + } else { + final TopicPartition topicPartition = topicPartitionFor(groupId); + groupsByTopicPartition + .computeIfAbsent(topicPartition, __ -> new ArrayList<>()) + .add(groupId); + } + }); + + groupsByTopicPartition.forEach((topicPartition, groupList) -> { + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = + runtime.scheduleWriteOperation( + "delete-groups", + topicPartition, + coordinator -> coordinator.deleteGroups(context, groupList) + ).exceptionally(exception -> { + if (exception instanceof UnknownTopicOrPartitionException || + exception instanceof NotEnoughReplicasException) { + return DeleteGroupsRequest.getErrorResultCollection( + groupList, + Errors.COORDINATOR_NOT_AVAILABLE + ); + } + + if (exception instanceof NotLeaderOrFollowerException || + exception instanceof KafkaStorageException) { + return DeleteGroupsRequest.getErrorResultCollection( + groupList, + Errors.NOT_COORDINATOR + ); + } + + if (exception instanceof RecordTooLargeException || + exception instanceof RecordBatchTooLargeException || + exception instanceof InvalidFetchSizeException) { + return DeleteGroupsRequest.getErrorResultCollection( + groupList, + Errors.UNKNOWN_SERVER_ERROR + ); + } + + return DeleteGroupsRequest.getErrorResultCollection( + groupList, + Errors.forException(exception) + ); + }); + + futures.add(future); + }); + + final CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + final CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> resFuture = allFutures.thenApply(v -> { + final DeleteGroupsResponseData.DeletableGroupResultCollection res = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + final List<String> deletedGroups = new ArrayList<>(); + futures.forEach(future -> + // We don't use res.addAll(future.join()) because DeletableGroupResultCollection is an ImplicitLinkedHashMultiCollection, + // which has requirements for adding elements (see ImplicitLinkedHashCollection.java#add). + future.join().forEach(result -> { + res.add(result.duplicate()); + if (result.errorCode() == Errors.NONE.code()) { + deletedGroups.add(result.groupId()); + } + }) + ); + log.info("The following groups were deleted: {}.", String.join(", ", deletedGroups)); Review Comment: I wonder if we should rather log this within the shard in order to have it logged per shard (with the shard context). What do you think? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -333,6 +348,87 @@ public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( return new CoordinatorResult<>(records, response); } + /** + * Handles an OffsetDelete request. + * + * @param request The OffsetDelete request. + * + * @return A Result containing the OffsetDeleteResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( + OffsetDeleteRequestData request + ) throws ApiException { + final Group group = validateOffsetDelete(request); + final List<Record> records = new ArrayList<>(); + final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); + final OffsetDeleteResponseData response = new OffsetDeleteResponseData(); + final TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = + offsetsByGroup.get(request.groupId()); + + request.topics().forEach(topic -> { + final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); + final TimelineHashMap<Integer, OffsetAndMetadata> offsetsByPartition = offsetsByTopic == null ? + null : offsetsByTopic.get(topic.name()); + + if (group.isSubscribedToTopic(topic.name())) { + topic.partitions().forEach(partition -> + responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code()) + ) + ); + } else { + topic.partitions().forEach(partition -> { + if (offsetsByPartition != null && offsetsByPartition.containsKey(partition.partitionIndex())) { + responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + ); + records.add(RecordHelpers.newOffsetCommitTombstoneRecord( + request.groupId(), + topic.name(), + partition.partitionIndex() + )); + } + }); + } + + final OffsetDeleteResponseData.OffsetDeleteResponseTopic responseTopic = + new OffsetDeleteResponseData.OffsetDeleteResponseTopic() + .setName(topic.name()) + .setPartitions(responsePartitionCollection); + responseTopicCollection.add(responseTopic); + }); + response.setTopics(responseTopicCollection); + + return new CoordinatorResult<>(records, response); Review Comment: nit: Would it make sense to use? We don't really need `response` except here. ``` return new CoordinatorResult<>( records, new OffsetDeleteResponseData().setTopics(responseTopicCollection) ); ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ########## @@ -307,6 +344,57 @@ private void replay( lastWrittenOffset++; } + + public void testOffsetDeleteWith( + OffsetMetadataManagerTestContext context, + String groupId, + String topic, + int partition, + Errors error + ) { + final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName(topic) + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition) + )) + ).iterator()); + + final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection expectedResponsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); + if (context.offsetMetadataManager.offset(groupId, topic, partition) != null) { + expectedResponsePartitionCollection.add( + new OffsetDeleteResponseData.OffsetDeleteResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(error.code()) + ); + } + + final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection expectedResponseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList( + new OffsetDeleteResponseData.OffsetDeleteResponseTopic() + .setName(topic) + .setPartitions(expectedResponsePartitionCollection) + ).iterator()); + + List<Record> expectedRecords = Collections.emptyList(); + if (context.offsetMetadataManager.offset(groupId, topic, partition) != null && Review Comment: nit: Should we define an helper method in the context (e.g. hasOffset(groupId, topic, partition))? I would also bring `error == Errors.NONE` back on the previous line because it fits there. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -592,6 +607,37 @@ public void validateOffsetFetch( validateMemberEpoch(memberEpoch, member.memberEpoch()); } + /** + * Validates the OffsetDelete request. + */ + @Override + public void validateOffsetDelete() {} + + /** + * Validates the DeleteGroups request. + */ + @Override + public void validateDeleteGroup() throws ApiException { + switch (state()) { + case STABLE: + case ASSIGNING: + case RECONCILING: + throw Errors.NON_EMPTY_GROUP.exception(); + default: + } Review Comment: nit: Could we use `state() != Empty`? This would be more robust. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -936,4 +938,207 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { assertEquals(expectedResponse, future.get()); } + + @Test + public void testDeleteOffsets() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData() + .setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList( + new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0) + ).iterator()); + OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList( + new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection) + ).iterator()); + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setTopics(responseTopicCollection); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteOffsetsInvalidGroupId() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteOffsetsCoordinatorNotAvailableException() throws Exception { Review Comment: I wonder if we should test all the exceptions that we re-map. I did something similar in `testConsumerGroupHeartbeatWithException`. What do you think? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -936,4 +938,207 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { assertEquals(expectedResponse, future.get()); } + + @Test + public void testDeleteOffsets() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData() + .setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList( + new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0) + ).iterator()); + OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList( + new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection) + ).iterator()); + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setTopics(responseTopicCollection); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteOffsetsInvalidGroupId() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteOffsetsCoordinatorNotAvailableException() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData() + .setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture( + Errors.COORDINATOR_LOAD_IN_PROGRESS.exception() + )); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteGroups() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 3); + + DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + DeleteGroupsResponseData.DeletableGroupResult result1 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-1"); + resultCollection1.add(result1); + + DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection2 = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + DeleteGroupsResponseData.DeletableGroupResult result2 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-2"); + resultCollection2.add(result2); + + DeleteGroupsResponseData.DeletableGroupResult result3 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-3") + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); + + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + expectedResultCollection.addAll(Arrays.asList( + new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()), + result1.duplicate(), + result2.duplicate(), + result3.duplicate() + )); + + when(runtime.partitions()).thenReturn(Sets.newSet( + new TopicPartition("__consumer_offsets", 0), + new TopicPartition("__consumer_offsets", 1), + new TopicPartition("__consumer_offsets", 2) + )); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(resultCollection1)); + + CompletableFuture<Object> resultCollectionFuture = new CompletableFuture<>(); + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(resultCollectionFuture); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())); + + List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", "group-id-3", null); + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = + service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), groupIds, BufferSupplier.NO_CACHING); + + assertFalse(future.isDone()); + resultCollectionFuture.complete(resultCollection2); + + assertTrue(expectedResultCollection.containsAll(future.get())); + assertTrue(future.get().containsAll(expectedResultCollection)); Review Comment: Can't we use `assertEquals`? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -936,4 +938,207 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { assertEquals(expectedResponse, future.get()); } + + @Test + public void testDeleteOffsets() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData() + .setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList( + new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0) + ).iterator()); + OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList( + new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection) + ).iterator()); + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setTopics(responseTopicCollection); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteOffsetsInvalidGroupId() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteOffsetsCoordinatorNotAvailableException() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData() + .setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture( + Errors.COORDINATOR_LOAD_IN_PROGRESS.exception() + )); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteGroups() throws Exception { Review Comment: ditto about the error mapping verification. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -105,6 +115,107 @@ public void testCommitOffset() { assertEquals(result, coordinator.commitOffset(context, request)); } + @Test + public void testDeleteGroups() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + groupMetadataManager, + offsetMetadataManager + ); + + RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); + List<String> groupIds = Arrays.asList("group-id-1", "group-id-2"); + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + expectedResultCollection.add(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1")); + expectedResultCollection.add(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2")); + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"), + RecordHelpers.newOffsetCommitTombstoneRecord("group-id-2", "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord("group-id-2") + ); + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> expectedResult = new CoordinatorResult<>( + expectedRecords, + expectedResultCollection + ); + + doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString()); + doAnswer(invocation -> { Review Comment: It would be better to use the other way in order to remain consistent with the other tests. Is this possible? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -333,6 +348,87 @@ public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( return new CoordinatorResult<>(records, response); } + /** + * Handles an OffsetDelete request. + * + * @param request The OffsetDelete request. + * + * @return A Result containing the OffsetDeleteResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( + OffsetDeleteRequestData request + ) throws ApiException { + final Group group = validateOffsetDelete(request); + final List<Record> records = new ArrayList<>(); + final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); + final OffsetDeleteResponseData response = new OffsetDeleteResponseData(); + final TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = + offsetsByGroup.get(request.groupId()); + + request.topics().forEach(topic -> { + final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); + final TimelineHashMap<Integer, OffsetAndMetadata> offsetsByPartition = offsetsByTopic == null ? + null : offsetsByTopic.get(topic.name()); + + if (group.isSubscribedToTopic(topic.name())) { + topic.partitions().forEach(partition -> + responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code()) + ) + ); + } else { + topic.partitions().forEach(partition -> { Review Comment: I am +1 for bringing the definition of `offsetsByPartition` within the `else` clause. However, we have to keep using `topic.partitions().forEach(` to iterate over the partitions. However, I don't like `if(offsetsByPartition ==null) {continue};`. How about using `offsetsByPartition != null`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org