This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a12f9f97c98 KAFKA-14506: Implement DeleteGroups API and OffsetDelete
API (#14408)
a12f9f97c98 is described below
commit a12f9f97c98f31d8d957ff9b816c4f73b84e0d9d
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Wed Oct 4 05:30:45 2023 -0400
KAFKA-14506: Implement DeleteGroups API and OffsetDelete API (#14408)
This patch implements DeleteGroups and OffsetDelete API in the new group
coordinator.
Reviewers: yangy0000, Ritika Reddy <[email protected]>, Jeff Kim
<[email protected]>, David Jacot <[email protected]>
---
checkstyle/suppressions.xml | 4 +-
.../kafka/common/requests/DeleteGroupsRequest.java | 32 +--
.../common/requests/DeleteGroupsRequestTest.java | 49 ++++
.../org/apache/kafka/coordinator/group/Group.java | 27 +++
.../coordinator/group/GroupCoordinatorService.java | 125 +++++++----
.../coordinator/group/GroupCoordinatorShard.java | 78 ++++++-
.../coordinator/group/GroupMetadataManager.java | 27 +++
.../coordinator/group/OffsetMetadataManager.java | 104 +++++++++
.../coordinator/group/consumer/ConsumerGroup.java | 43 ++++
.../coordinator/group/generic/GenericGroup.java | 53 ++++-
.../group/GroupCoordinatorServiceTest.java | 246 ++++++++++++++++++++-
.../group/GroupCoordinatorShardTest.java | 147 ++++++++++++
.../group/GroupMetadataManagerTest.java | 30 ++-
.../group/OffsetMetadataManagerTest.java | 206 ++++++++++++++++-
.../group/consumer/ConsumerGroupTest.java | 30 +++
.../group/generic/GenericGroupTest.java | 61 ++++-
16 files changed, 1201 insertions(+), 61 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 29ef2ff0ad8..c6dfae8cc44 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -326,11 +326,11 @@
<suppress checks="(NPathComplexity|MethodLength)"
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
<suppress checks="ClassFanOutComplexity"
-
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
+
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorService|GroupCoordinatorServiceTest).java"/>
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
-
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
+
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/>
<suppress checks="JavaNCSS"
files="GroupMetadataManagerTest.java"/>
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
index 87d6deedc12..6bee4eb8937 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
@@ -18,13 +18,12 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
-import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
-import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
+import java.util.List;
public class DeleteGroupsRequest extends AbstractRequest {
public static class Builder extends
AbstractRequest.Builder<DeleteGroupsRequest> {
@@ -55,18 +54,9 @@ public class DeleteGroupsRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- Errors error = Errors.forException(e);
- DeletableGroupResultCollection groupResults = new
DeletableGroupResultCollection();
- for (String groupId : data.groupsNames()) {
- groupResults.add(new DeletableGroupResult()
- .setGroupId(groupId)
- .setErrorCode(error.code()));
- }
-
- return new DeleteGroupsResponse(
- new DeleteGroupsResponseData()
- .setResults(groupResults)
- .setThrottleTimeMs(throttleTimeMs)
+ return new DeleteGroupsResponse(new DeleteGroupsResponseData()
+ .setResults(getErrorResultCollection(data.groupsNames(),
Errors.forException(e)))
+ .setThrottleTimeMs(throttleTimeMs)
);
}
@@ -78,4 +68,18 @@ public class DeleteGroupsRequest extends AbstractRequest {
public DeleteGroupsRequestData data() {
return data;
}
+
+ public static DeleteGroupsResponseData.DeletableGroupResultCollection
getErrorResultCollection(
+ List<String> groupIds,
+ Errors error
+ ) {
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ groupIds.forEach(groupId -> resultCollection.add(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ .setErrorCode(error.code())
+ ));
+ return resultCollection;
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java
new file mode 100644
index 00000000000..9745af8d0b7
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static
org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DeleteGroupsRequestTest {
+
+ @Test
+ public void testGetErrorResultCollection() {
+ String groupId1 = "group-id-1";
+ String groupId2 = "group-id-2";
+ DeleteGroupsRequestData data = new DeleteGroupsRequestData()
+ .setGroupsNames(Arrays.asList(groupId1, groupId2));
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new
DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId1)
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()),
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId2)
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+ ).iterator());
+
+ assertEquals(expectedResultCollection,
getErrorResultCollection(data.groupsNames(),
Errors.COORDINATOR_LOAD_IN_PROGRESS));
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index cbd25cb0700..29a252e47bf 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -19,6 +19,8 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.ListGroupsResponseData;
+import java.util.List;
+
/**
* Interface common for all groups.
*/
@@ -90,4 +92,29 @@ public interface Group {
int memberEpoch,
long lastCommittedOffset
) throws KafkaException;
+
+ /**
+ * Validates the OffsetDelete request.
+ */
+ void validateOffsetDelete() throws KafkaException;
+
+ /**
+ * Validates the DeleteGroups request.
+ */
+ void validateDeleteGroup() throws KafkaException;
+
+ /**
+ * Returns true if the group is actively subscribed to the topic.
+ *
+ * @param topic The topic name.
+ * @return Whether the group is subscribed to the topic.
+ */
+ boolean isSubscribedToTopic(String topic);
+
+ /**
+ * Populates the list of records with tombstone(s) for deleting the group.
+ *
+ * @param records The list of records.
+ */
+ void createGroupTombstoneRecords(List<Record> records);
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 9e11cfa63e7..1c71f6dee37 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
@@ -77,6 +78,8 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
@@ -523,9 +526,50 @@ public class GroupCoordinatorService implements
GroupCoordinator {
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
futures =
+ new ArrayList<>(groupIds.size());
+
+ final Map<TopicPartition, List<String>> groupsByTopicPartition = new
HashMap<>();
+ groupIds.forEach(groupId -> {
+ // For backwards compatibility, we support DeleteGroups for the
empty group id.
+ if (groupId == null) {
+
futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(
+ Collections.singletonList(null),
+ Errors.INVALID_GROUP_ID
+ )));
+ } else {
+ final TopicPartition topicPartition =
topicPartitionFor(groupId);
+ groupsByTopicPartition
+ .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+ .add(groupId);
+ }
+ });
+
+ groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ runtime.scheduleWriteOperation(
+ "delete-groups",
+ topicPartition,
+ coordinator -> coordinator.deleteGroups(context, groupList)
+ ).exceptionally(exception ->
+ DeleteGroupsRequest.getErrorResultCollection(groupList,
normalizeException(exception))
+ );
+
+ futures.add(future);
+ });
+
+ final CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ return allFutures.thenApply(__ -> {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection res
= new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ futures.forEach(future ->
+ // We don't use res.addAll(future.join()) because
DeletableGroupResultCollection is an ImplicitLinkedHashMultiCollection,
+ // which has requirements for adding elements (see
ImplicitLinkedHashCollection.java#add).
+ future.join().forEach(result ->
+ res.add(result.duplicate())
+ )
+ );
+ return res;
+ });
}
/**
@@ -641,37 +685,9 @@ public class GroupCoordinatorService implements
GroupCoordinator {
"commit-offset",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.commitOffset(context, request)
- ).exceptionally(exception -> {
- if (exception instanceof UnknownTopicOrPartitionException ||
- exception instanceof NotEnoughReplicasException) {
- return OffsetCommitRequest.getErrorResponse(
- request,
- Errors.COORDINATOR_NOT_AVAILABLE
- );
- }
-
- if (exception instanceof NotLeaderOrFollowerException ||
- exception instanceof KafkaStorageException) {
- return OffsetCommitRequest.getErrorResponse(
- request,
- Errors.NOT_COORDINATOR
- );
- }
-
- if (exception instanceof RecordTooLargeException ||
- exception instanceof RecordBatchTooLargeException ||
- exception instanceof InvalidFetchSizeException) {
- return OffsetCommitRequest.getErrorResponse(
- request,
- Errors.INVALID_COMMIT_OFFSET_SIZE
- );
- }
-
- return OffsetCommitRequest.getErrorResponse(
- request,
- Errors.forException(exception)
- );
- });
+ ).exceptionally(exception ->
+ OffsetCommitRequest.getErrorResponse(request,
normalizeException(exception))
+ );
}
/**
@@ -705,9 +721,20 @@ public class GroupCoordinatorService implements
GroupCoordinator {
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ if (!isGroupIdNotEmpty(request.groupId())) {
+ return CompletableFuture.completedFuture(new
OffsetDeleteResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ );
+ }
+
+ return runtime.scheduleWriteOperation(
+ "delete-offsets",
+ topicPartitionFor(request.groupId()),
+ coordinator -> coordinator.deleteOffsets(context, request)
+ ).exceptionally(exception ->
+ new OffsetDeleteResponseData()
+ .setErrorCode(normalizeException(exception).code())
+ );
}
/**
@@ -827,4 +854,28 @@ public class GroupCoordinatorService implements
GroupCoordinator {
private static boolean isGroupIdNotEmpty(String groupId) {
return groupId != null && !groupId.isEmpty();
}
+
+ /**
+ * Handles the exception in the scheduleWriteOperation.
+ * @return The Errors instance associated with the given exception.
+ */
+ private static Errors normalizeException(Throwable exception) {
+ if (exception instanceof UnknownTopicOrPartitionException ||
+ exception instanceof NotEnoughReplicasException) {
+ return Errors.COORDINATOR_NOT_AVAILABLE;
+ }
+
+ if (exception instanceof NotLeaderOrFollowerException ||
+ exception instanceof KafkaStorageException) {
+ return Errors.NOT_COORDINATOR;
+ }
+
+ if (exception instanceof RecordTooLargeException ||
+ exception instanceof RecordBatchTooLargeException ||
+ exception instanceof InvalidFetchSizeException) {
+ return Errors.UNKNOWN_SERVER_ERROR;
+ }
+
+ return Errors.forException(exception);
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 97327c9722f..a094d7c6e41 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -27,12 +28,15 @@ import
org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -60,7 +64,9 @@ import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -156,12 +162,18 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
.build();
return new GroupCoordinatorShard(
+ logContext,
groupMetadataManager,
offsetMetadataManager
);
}
}
+ /**
+ * The logger.
+ */
+ private final Logger log;
+
/**
* The group metadata manager.
*/
@@ -175,13 +187,16 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
/**
* Constructor.
*
+ * @param logContext The log context.
* @param groupMetadataManager The group metadata manager.
* @param offsetMetadataManager The offset metadata manager.
*/
GroupCoordinatorShard(
+ LogContext logContext,
GroupMetadataManager groupMetadataManager,
OffsetMetadataManager offsetMetadataManager
) {
+ this.log = logContext.logger(GroupCoordinatorShard.class);
this.groupMetadataManager = groupMetadataManager;
this.offsetMetadataManager = offsetMetadataManager;
}
@@ -262,6 +277,51 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
);
}
+ /**
+ * Handles a DeleteGroups request.
+ *
+ * @param context The request context.
+ * @param groupIds The groupIds of the groups to be deleted
+ * @return A Result containing the
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> deleteGroups(
+ RequestContext context,
+ List<String> groupIds
+ ) throws ApiException {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new
DeleteGroupsResponseData.DeletableGroupResultCollection(groupIds.size());
+ final List<Record> records = new ArrayList<>();
+ int numDeletedOffsets = 0;
+ final List<String> deletedGroups = new ArrayList<>();
+
+ for (String groupId : groupIds) {
+ try {
+ groupMetadataManager.validateDeleteGroup(groupId);
+ numDeletedOffsets +=
offsetMetadataManager.deleteAllOffsets(groupId, records);
+ groupMetadataManager.deleteGroup(groupId, records);
+ deletedGroups.add(groupId);
+
+ resultCollection.add(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ );
+ } catch (ApiException exception) {
+ resultCollection.add(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ .setErrorCode(Errors.forException(exception).code())
+ );
+ }
+ }
+
+ log.info("The following groups were deleted: {}. A total of {} offsets
were removed.",
+ String.join(", ", deletedGroups),
+ numDeletedOffsets
+ );
+ return new CoordinatorResult<>(records, resultCollection);
+ }
+
/**
* Fetch offsets for a given set of partitions and a given group.
*
@@ -295,7 +355,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
}
/**
- * Handles a OffsetCommit request.
+ * Handles an OffsetCommit request.
*
* @param context The request context.
* @param request The actual OffsetCommit request.
@@ -341,6 +401,22 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
return groupMetadataManager.genericGroupLeave(context, request);
}
+ /**
+ * Handles a OffsetDelete request.
+ *
+ * @param context The request context.
+ * @param request The actual OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponse response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ RequestContext context,
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ return offsetMetadataManager.deleteOffsets(request);
+ }
+
/**
* The coordinator has been loaded. This is used to apply any
* post loading operations (e.g. registering timers).
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 66742312a64..7588c598dcc 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3060,6 +3060,33 @@ public class GroupMetadataManager {
group.remove(member.memberId());
}
+ /**
+ * Handles a DeleteGroups request.
+ * Populates the record list passed in with record to update the state
machine.
+ * Validations are done in {@link
GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
+ * calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+ *
+ * @param groupId The id of the group to be deleted. It has been checked
in {@link GroupMetadataManager#validateDeleteGroup}.
+ * @param records The record list to populate.
+ */
+ public void deleteGroup(
+ String groupId,
+ List<Record> records
+ ) {
+ // At this point, we have already validated the group id, so we know
that the group exists and that no exception will be thrown.
+ group(groupId).createGroupTombstoneRecords(records);
+ }
+
+ /**
+ * Validates the DeleteGroups request.
+ *
+ * @param groupId The id of the group to be deleted.
+ */
+ void validateDeleteGroup(String groupId) throws ApiException {
+ Group group = group(groupId);
+ group.validateDeleteGroup();
+ }
+
/**
* Checks whether the given protocol type or name in the request is
inconsistent with the group's.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index c8bf388d714..9744e492f49 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -23,8 +23,10 @@ import
org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
@@ -45,6 +47,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
+import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
@@ -242,6 +245,19 @@ public class OffsetMetadataManager {
);
}
+ /**
+ * Validates an OffsetDelete request.
+ *
+ * @param request The actual request.
+ */
+ private Group validateOffsetDelete(
+ OffsetDeleteRequestData request
+ ) throws GroupIdNotFoundException {
+ Group group = groupMetadataManager.group(request.groupId());
+ group.validateOffsetDelete();
+ return group;
+ }
+
/**
* Computes the expiration timestamp based on the retention time provided
in the OffsetCommit
* request.
@@ -333,6 +349,94 @@ public class OffsetMetadataManager {
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final Group group = validateOffsetDelete(request);
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ final TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> offsetsByTopic =
+ offsetsByGroup.get(request.groupId());
+
+ request.topics().forEach(topic -> {
+ final
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+
+ if (group.isSubscribedToTopic(topic.name())) {
+ topic.partitions().forEach(partition ->
+ responsePartitionCollection.add(new
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+ .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
+ )
+ );
+ } else {
+ final TimelineHashMap<Integer, OffsetAndMetadata>
offsetsByPartition = offsetsByTopic == null ?
+ null : offsetsByTopic.get(topic.name());
+ if (offsetsByPartition != null) {
+ topic.partitions().forEach(partition -> {
+ if
(offsetsByPartition.containsKey(partition.partitionIndex())) {
+ responsePartitionCollection.add(new
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+ );
+
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+ request.groupId(),
+ topic.name(),
+ partition.partitionIndex()
+ ));
+ }
+ });
+ }
+ }
+
+ responseTopicCollection.add(new
OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+ .setName(topic.name())
+ .setPartitions(responsePartitionCollection)
+ );
+ });
+
+ return new CoordinatorResult<>(
+ records,
+ new OffsetDeleteResponseData().setTopics(responseTopicCollection)
+ );
+ }
+
+ /**
+ * Deletes offsets as part of a DeleteGroups request.
+ * Populates the record list passed in with records to update the state
machine.
+ * Validations are done in {@link
GroupCoordinatorShard#deleteGroups(RequestContext, List)}
+ *
+ * @param groupId The id of the given group.
+ * @param records The record list to populate.
+ *
+ * @return The number of offsets to be deleted.
+ */
+ public int deleteAllOffsets(
+ String groupId,
+ List<Record> records
+ ) {
+ TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
offsetsByTopic = offsetsByGroup.get(groupId);
+ AtomicInteger numDeletedOffsets = new AtomicInteger();
+
+ if (offsetsByTopic != null) {
+ offsetsByTopic.forEach((topic, offsetsByPartition) ->
+ offsetsByPartition.keySet().forEach(partition -> {
+
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic,
partition));
+ numDeletedOffsets.getAndIncrement();
+ })
+ );
+ }
+ return numDeletedOffsets.get();
+ }
+
/**
* Fetch offsets for a given Group.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 10222c3a3cc..21f6f8124cd 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -18,10 +18,14 @@ package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.RecordHelpers;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
@@ -33,6 +37,7 @@ import org.apache.kafka.timeline.TimelineObject;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -341,6 +346,16 @@ public class ConsumerGroup implements Group {
return Collections.unmodifiableSet(subscribedTopicNames.keySet());
}
+ /**
+ * Returns true if the consumer group is actively subscribed to the topic.
+ *
+ * @param topic The topic name.
+ * @return whether the group is subscribed to the topic.
+ */
+ public boolean isSubscribedToTopic(String topic) {
+ return subscribedTopicNames.containsKey(topic);
+ }
+
/**
* Returns the target assignment of the member.
*
@@ -592,6 +607,34 @@ public class ConsumerGroup implements Group {
validateMemberEpoch(memberEpoch, member.memberEpoch());
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() {}
+
+ /**
+ * Validates the DeleteGroups request.
+ */
+ @Override
+ public void validateDeleteGroup() throws ApiException {
+ if (state() != ConsumerGroupState.EMPTY) {
+ throw Errors.NON_EMPTY_GROUP.exception();
+ }
+ }
+
+ /**
+ * Populates the list of records with tombstone(s) for deleting the group.
+ *
+ * @param records The list of records.
+ */
+ @Override
+ public void createGroupTombstoneRecords(List<Record> records) {
+
records.add(RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId()));
+
records.add(RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId()));
+ records.add(RecordHelpers.newGroupEpochTombstoneRecord(groupId()));
+ }
+
/**
* Throws a StaleMemberEpochException if the received member epoch does
not match
* the expected member epoch.
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
index 18f96f2f78e..88ec52727fa 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.generic;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
@@ -32,6 +33,8 @@ import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.RecordHelpers;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
@@ -54,6 +57,7 @@ import static
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPL
import static
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
import static
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
import static
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+import static
org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
/**
* This class holds metadata for a generic group where the
@@ -849,6 +853,51 @@ public class GenericGroup implements Group {
}
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() throws ApiException {
+ switch (currentState()) {
+ case DEAD:
+ throw new GroupIdNotFoundException(String.format("Group %s is
in dead state.", groupId));
+ case STABLE:
+ case PREPARING_REBALANCE:
+ case COMPLETING_REBALANCE:
+ if (!usesConsumerGroupProtocol()) {
+ throw Errors.NON_EMPTY_GROUP.exception();
+ }
+ break;
+ default:
+ }
+ }
+
+ /**
+ * Validates the DeleteGroups request.
+ */
+ @Override
+ public void validateDeleteGroup() throws ApiException {
+ switch (currentState()) {
+ case DEAD:
+ throw new GroupIdNotFoundException(String.format("Group %s is
in dead state.", groupId));
+ case STABLE:
+ case PREPARING_REBALANCE:
+ case COMPLETING_REBALANCE:
+ throw Errors.NON_EMPTY_GROUP.exception();
+ default:
+ }
+ }
+
+ /**
+ * Populates the list of records with tombstone(s) for deleting the group.
+ *
+ * @param records The list of records.
+ */
+ @Override
+ public void createGroupTombstoneRecords(List<Record> records) {
+ records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
+ }
+
/**
* Verify the member id is up to date for static members. Return true if
both conditions met:
* 1. given member is a known static member to group
@@ -1015,10 +1064,10 @@ public class GenericGroup implements Group {
/**
* Returns true if the consumer group is actively subscribed to the topic.
When the consumer
- * group does not know, because the information is not available yet or
because the it has
+ * group does not know, because the information is not available yet or
because it has
* failed to parse the Consumer Protocol, it returns true to be safe.
*
- * @param topic the topic name.
+ * @param topic The topic name.
* @return whether the group is subscribed to the topic.
*/
public boolean isSubscribedToTopic(String topic) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 8f39f524484..755bd2aa56a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -34,12 +34,15 @@ import
org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -499,7 +502,6 @@ public class GroupCoordinatorServiceTest {
.setGroupId(null)
.setMemberId(UNKNOWN_MEMBER_ID);
-
CompletableFuture<SyncGroupResponseData> response = service.syncGroup(
requestContext(ApiKeys.SYNC_GROUP),
request,
@@ -936,4 +938,246 @@ public class GroupCoordinatorServiceTest {
assertEquals(expectedResponse, future.get());
}
+
+ @Test
+ public void testDeleteOffsets() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ ).iterator());
+ OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+ .setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+ ).iterator());
+ OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+ ).iterator());
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setTopics(responseTopicCollection);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsInvalidGroupId() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ ).iterator());
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @ParameterizedTest
+ @MethodSource("testConsumerGroupHeartbeatWithExceptionSource")
+ public void testDeleteOffsetsWithException(
+ Throwable exception,
+ short expectedErrorCode
+ ) throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ ).iterator());
+ OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+ .setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(expectedErrorCode);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(exception));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-1");
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2");
+ resultCollection2.add(result2);
+
+ DeleteGroupsResponseData.DeletableGroupResult result3 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-3")
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(Arrays.asList(
+ new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()),
+ result2.duplicate(),
+ result3.duplicate(),
+ result1.duplicate()
+ ));
+
+ when(runtime.partitions()).thenReturn(Sets.newSet(
+ new TopicPartition("__consumer_offsets", 0),
+ new TopicPartition("__consumer_offsets", 1),
+ new TopicPartition("__consumer_offsets", 2)
+ ));
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ CompletableFuture<Object> resultCollectionFuture = new
CompletableFuture<>();
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(resultCollectionFuture);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+ ArgumentMatchers.any()
+
)).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
+
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2",
"group-id-3", null);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ assertFalse(future.isDone());
+ resultCollectionFuture.complete(resultCollection2);
+
+ assertTrue(future.isDone());
+ assertEquals(expectedResultCollection, future.get());
+ }
+
+ @ParameterizedTest
+ @MethodSource("testConsumerGroupHeartbeatWithExceptionSource")
+ public void testDeleteGroupsWithException(
+ Throwable exception,
+ short expectedErrorCode
+ ) throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(exception));
+
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(
+ requestContext(ApiKeys.DELETE_GROUPS),
+ Collections.singletonList("group-id"),
+ BufferSupplier.NO_CACHING
+ );
+
+ assertEquals(
+ new
DeleteGroupsResponseData.DeletableGroupResultCollection(Collections.singletonList(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id")
+ .setErrorCode(expectedErrorCode)
+ ).iterator()),
+ future.get()
+ );
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index a663147e7e3..1ed931c029d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -18,10 +18,13 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
@@ -42,14 +45,22 @@ import
org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -62,6 +73,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -86,6 +98,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -105,11 +118,127 @@ public class GroupCoordinatorShardTest {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroups() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<Record> expectedRecords = new ArrayList<>();
+ for (String groupId : groupIds) {
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+ expectedRecords.addAll(Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+ ));
+ }
+
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+ when(offsetMetadataManager.deleteAllOffsets(anyString(),
anyList())).thenAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0));
+ return 1;
+ });
+ // Mockito#when only stubs method returning non-void value, so we use
Mockito#doAnswer instead.
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+ return null;
+ }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> coordinatorResult =
+ coordinator.deleteGroups(context, groupIds);
+
+ for (String groupId : groupIds) {
+ verify(groupMetadataManager,
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
+ verify(groupMetadataManager,
times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
+ verify(offsetMetadataManager,
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
+ }
+ assertEquals(expectedResult, coordinatorResult);
+ }
+
+ @Test
+ public void testDeleteGroupsInvalidGroupId() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2",
"group-id-3");
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new
DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-1"),
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2")
+ .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-3")
+ ).iterator());
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3")
+ );
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+ // Mockito#when only stubs method returning non-void value, so we use
Mockito#doAnswer and Mockito#doThrow instead.
+ doThrow(Errors.INVALID_GROUP_ID.exception())
+
.when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.eq("group-id-2"));
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0));
+ return null;
+ }).when(offsetMetadataManager).deleteAllOffsets(anyString(),
anyList());
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+ return null;
+ }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> coordinatorResult =
+ coordinator.deleteGroups(context, groupIds);
+
+ for (String groupId : groupIds) {
+ verify(groupMetadataManager,
times(1)).validateDeleteGroup(eq(groupId));
+ if (!groupId.equals("group-id-2")) {
+ verify(groupMetadataManager,
times(1)).deleteGroup(eq(groupId), anyList());
+ verify(offsetMetadataManager,
times(1)).deleteAllOffsets(eq(groupId), anyList());
+ }
+ }
+ assertEquals(expectedResult, coordinatorResult);
+ }
+
@Test
public void testReplayOffsetCommit() {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -135,6 +264,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -159,6 +289,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -179,6 +310,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -198,6 +330,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -218,6 +351,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -237,6 +371,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -257,6 +392,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -276,6 +412,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -296,6 +433,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -315,6 +453,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -335,6 +474,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -354,6 +494,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -374,6 +515,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -393,6 +535,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -405,6 +548,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -424,6 +568,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -443,6 +588,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@@ -463,6 +609,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 8159ffd3a94..6bf08f10934 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -738,7 +738,7 @@ public class GroupMetadataManagerTest {
request,
responseFuture
);
-
+
return new SyncResult(responseFuture, coordinatorResult);
}
@@ -9355,6 +9355,34 @@ public class GroupMetadataManagerTest {
assertEquals(expectedResponse, leaveResult.response());
}
+ @Test
+ public void testGenericGroupDelete() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ List<Record> expectedRecords =
Collections.singletonList(RecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
+ List<Record> records = new ArrayList<>();
+ context.groupMetadataManager.deleteGroup("group-id", records);
+ assertEquals(expectedRecords, records);
+ }
+
+ @Test
+ public void testConsumerGroupDelete() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", true);
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"),
+
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord("group-id"),
+ RecordHelpers.newGroupEpochTombstoneRecord("group-id")
+ );
+ List<Record> records = new ArrayList<>();
+ context.groupMetadataManager.deleteGroup("group-id", records);
+ assertEquals(expectedRecords, records);
+ }
+
private static void assertNoOrEmptyResult(List<ExpiredTimeout<Void,
Record>> timeouts) {
assertTrue(timeouts.size() <= 1);
timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT,
timeout.result));
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 7f355da76aa..5598a74e50e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.IllegalGenerationException;
@@ -26,6 +27,8 @@ import
org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.network.ClientInformation;
@@ -55,8 +58,10 @@ import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.net.InetAddress;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -70,6 +75,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class OffsetMetadataManagerTest {
static class OffsetMetadataManagerTestContext {
@@ -177,6 +183,26 @@ public class OffsetMetadataManagerTest {
return result;
}
+ public CoordinatorResult<OffsetDeleteResponseData, Record>
deleteOffsets(
+ OffsetDeleteRequestData request
+ ) {
+ CoordinatorResult<OffsetDeleteResponseData, Record> result =
offsetMetadataManager.deleteOffsets(request);
+ result.records().forEach(this::replay);
+ return result;
+ }
+
+ public int deleteAllOffsets(
+ String groupId,
+ List<Record> records
+ ) {
+ List<Record> addedRecords = new ArrayList<>();
+ int numDeletedOffsets =
offsetMetadataManager.deleteAllOffsets(groupId, addedRecords);
+ addedRecords.forEach(this::replay);
+
+ records.addAll(addedRecords);
+ return numDeletedOffsets;
+ }
+
public List<OffsetFetchResponseData.OffsetFetchResponseTopics>
fetchOffsets(
String groupId,
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
@@ -257,8 +283,6 @@ public class OffsetMetadataManagerTest {
long offset,
int leaderEpoch
) {
- snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
-
replay(RecordHelpers.newOffsetCommitRecord(
groupId,
topic,
@@ -274,6 +298,18 @@ public class OffsetMetadataManagerTest {
));
}
+ public void deleteOffset(
+ String groupId,
+ String topic,
+ int partition
+ ) {
+ replay(RecordHelpers.newOffsetCommitTombstoneRecord(
+ groupId,
+ topic,
+ partition
+ ));
+ }
+
private ApiMessage messageOrNull(ApiMessageAndVersion
apiMessageAndVersion) {
if (apiMessageAndVersion == null) {
return null;
@@ -285,6 +321,8 @@ public class OffsetMetadataManagerTest {
private void replay(
Record record
) {
+ snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+
ApiMessageAndVersion key = record.key();
ApiMessageAndVersion value = record.value();
@@ -307,6 +345,63 @@ public class OffsetMetadataManagerTest {
lastWrittenOffset++;
}
+
+ public void testOffsetDeleteWith(
+ String groupId,
+ String topic,
+ int partition,
+ Errors expectedError
+ ) {
+ final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName(topic)
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
+ ))
+ ).iterator());
+
+ final
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
expectedResponsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ if (hasOffset(groupId, topic, partition)) {
+ expectedResponsePartitionCollection.add(
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(expectedError.code())
+ );
+ }
+
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
expectedResponseTopicCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+ .setName(topic)
+ .setPartitions(expectedResponsePartitionCollection)
+ ).iterator());
+
+ List<Record> expectedRecords = Collections.emptyList();
+ if (hasOffset(groupId, topic, partition) && expectedError ==
Errors.NONE) {
+ expectedRecords = Collections.singletonList(
+ RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
topic, partition)
+ );
+ }
+
+ final CoordinatorResult<OffsetDeleteResponseData, Record>
coordinatorResult = deleteOffsets(
+ new OffsetDeleteRequestData()
+ .setGroupId(groupId)
+ .setTopics(requestTopicCollection)
+ );
+
+ assertEquals(new
OffsetDeleteResponseData().setTopics(expectedResponseTopicCollection),
coordinatorResult.response());
+ assertEquals(expectedRecords, coordinatorResult.records());
+ }
+
+ public boolean hasOffset(
+ String groupId,
+ String topic,
+ int partition
+ ) {
+ return offsetMetadataManager.offset(groupId, topic, partition) !=
null;
+ }
}
@ParameterizedTest
@@ -1561,6 +1656,113 @@ public class OffsetMetadataManagerTest {
() -> context.fetchAllOffsets("group", "member", 10,
Long.MAX_VALUE));
}
+ @Test
+ public void testGenericGroupOffsetDelete() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+ group.setSubscribedTopics(Optional.of(Collections.emptySet()));
+ context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE);
+ }
+
+ @Test
+ public void testGenericGroupOffsetDeleteWithErrors() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+ group.setSubscribedTopics(Optional.of(Collections.singleton("bar")));
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+
+ // Delete the offset whose topic partition doesn't exist.
+ context.testOffsetDeleteWith("foo", "bar1", 0, Errors.NONE);
+ // Delete the offset from the topic that the group is subscribed to.
+ context.testOffsetDeleteWith("foo", "bar", 0,
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+ }
+
+ @Test
+ public void testConsumerGroupOffsetDelete() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ "foo",
+ true
+ );
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+ assertFalse(group.isSubscribedToTopic("bar"));
+ context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE);
+ }
+
+ @Test
+ public void testConsumerGroupOffsetDeleteWithErrors() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ "foo",
+ true
+ );
+ MetadataImage image = new
GroupMetadataManagerTest.MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), "foo", 1)
+ .addRacks()
+ .build();
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder("member1")
+ .setSubscribedTopicNames(Collections.singletonList("bar"))
+ .build();
+ group.computeSubscriptionMetadata(
+ null,
+ member1,
+ image.topics(),
+ image.cluster()
+ );
+ group.updateMember(member1);
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+ assertTrue(group.isSubscribedToTopic("bar"));
+
+ // Delete the offset whose topic partition doesn't exist.
+ context.testOffsetDeleteWith("foo", "bar1", 0, Errors.NONE);
+ // Delete the offset from the topic that the group is subscribed to.
+ context.testOffsetDeleteWith("foo", "bar", 0,
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+ }
+
+ @ParameterizedTest
+ @EnumSource(Group.GroupType.class)
+ public void testDeleteGroupAllOffsets(Group.GroupType groupType) {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ switch (groupType) {
+ case GENERIC:
+ context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+ break;
+ case CONSUMER:
+ context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ "foo",
+ true
+ );
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid group type: " +
groupType);
+ }
+ context.commitOffset("foo", "bar-0", 0, 100L, 0);
+ context.commitOffset("foo", "bar-0", 1, 100L, 0);
+ context.commitOffset("foo", "bar-1", 0, 100L, 0);
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-1", 0),
+ RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 0),
+ RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 1)
+ );
+
+ List<Record> records = new ArrayList<>();
+ int numDeleteOffsets = context.deleteAllOffsets("foo", records);
+
+ assertEquals(expectedRecords, records);
+ assertEquals(3, numDeleteOffsets);
+ }
+
static private OffsetFetchResponseData.OffsetFetchResponsePartitions
mkOffsetPartitionResponse(
int partition,
long offset,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index 9c9421958c5..210aa8eb901 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.utils.LogContext;
@@ -35,6 +36,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static
org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -673,4 +675,32 @@ public class ConsumerGroupTest {
// This should succeed.
group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE);
}
+
+ @Test
+ public void testValidateDeleteGroup() {
+ ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+ assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY,
consumerGroup.state());
+ assertDoesNotThrow(consumerGroup::validateDeleteGroup);
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder("member1")
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setTargetMemberEpoch(1)
+ .build();
+ consumerGroup.updateMember(member1);
+
+ assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING,
consumerGroup.state());
+ assertThrows(GroupNotEmptyException.class,
consumerGroup::validateDeleteGroup);
+
+ consumerGroup.setGroupEpoch(1);
+
+ assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING,
consumerGroup.state());
+ assertThrows(GroupNotEmptyException.class,
consumerGroup::validateDeleteGroup);
+
+ consumerGroup.setTargetAssignmentEpoch(1);
+
+ assertEquals(ConsumerGroup.ConsumerGroupState.STABLE,
consumerGroup.state());
+ assertThrows(GroupNotEmptyException.class,
consumerGroup::validateDeleteGroup);
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
index ba4b177f676..05afdd26edf 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
@@ -20,6 +20,8 @@ import
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -45,6 +47,7 @@ import static
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
import static
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
import static
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
import static
org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -776,7 +779,7 @@ public class GenericGroupTest {
protocolType,
protocols
);
-
+
group.add(member);
assertTrue(group.hasMemberId(memberId));
assertTrue(group.hasStaticMember(groupInstanceId));
@@ -1026,6 +1029,62 @@ public class GenericGroupTest {
() -> group.validateOffsetCommit("member-id", "new-instance-id",
1));
}
+ @Test
+ public void testValidateOffsetDelete() {
+ assertFalse(group.usesConsumerGroupProtocol());
+ group.transitionTo(PREPARING_REBALANCE);
+ assertThrows(GroupNotEmptyException.class,
group::validateOffsetDelete);
+ group.transitionTo(COMPLETING_REBALANCE);
+ assertThrows(GroupNotEmptyException.class,
group::validateOffsetDelete);
+ group.transitionTo(STABLE);
+ assertThrows(GroupNotEmptyException.class,
group::validateOffsetDelete);
+
+ JoinGroupRequestProtocolCollection protocols = new
JoinGroupRequestProtocolCollection();
+ protocols.add(new JoinGroupRequestProtocol()
+ .setName("roundrobin")
+ .setMetadata(new byte[0]));
+ GenericGroupMember member = new GenericGroupMember(
+ memberId,
+ Optional.of(groupInstanceId),
+ clientId,
+ clientHost,
+ rebalanceTimeoutMs,
+ sessionTimeoutMs,
+ protocolType,
+ protocols
+ );
+ group.add(member);
+
+ assertTrue(group.usesConsumerGroupProtocol());
+ group.transitionTo(PREPARING_REBALANCE);
+ assertDoesNotThrow(group::validateOffsetDelete);
+ group.transitionTo(COMPLETING_REBALANCE);
+ assertDoesNotThrow(group::validateOffsetDelete);
+ group.transitionTo(STABLE);
+ assertDoesNotThrow(group::validateOffsetDelete);
+
+ group.transitionTo(PREPARING_REBALANCE);
+ group.transitionTo(EMPTY);
+ assertDoesNotThrow(group::validateOffsetDelete);
+ group.transitionTo(DEAD);
+ assertThrows(GroupIdNotFoundException.class,
group::validateOffsetDelete);
+ }
+
+ @Test
+ public void testValidateDeleteGroup() {
+ group.transitionTo(PREPARING_REBALANCE);
+ assertThrows(GroupNotEmptyException.class, group::validateDeleteGroup);
+ group.transitionTo(COMPLETING_REBALANCE);
+ assertThrows(GroupNotEmptyException.class, group::validateDeleteGroup);
+ group.transitionTo(STABLE);
+ assertThrows(GroupNotEmptyException.class, group::validateDeleteGroup);
+ group.transitionTo(PREPARING_REBALANCE);
+ group.transitionTo(EMPTY);
+ assertDoesNotThrow(group::validateDeleteGroup);
+ group.transitionTo(DEAD);
+ assertThrows(GroupIdNotFoundException.class,
group::validateDeleteGroup);
+ }
+
private void assertState(GenericGroup group, GenericGroupState
targetState) {
Set<GenericGroupState> otherStates = new HashSet<>();
otherStates.add(STABLE);