dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1332632377
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +525,46 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+ groupIds.forEach(groupId -> {
+ final int partition = partitionFor(groupId);
Review Comment:
nit: I wonder if we should use `topicPartitionFor` here. With this, we could
directly have the TopicPartition as the key in the Map and we would not need to
create `new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition)` later
on. What do you think?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +525,46 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+ groupIds.forEach(groupId -> {
+ final int partition = partitionFor(groupId);
+ final List<String> groupList =
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+ groupList.add(groupId);
+ groupsByPartition.put(partition, groupList);
Review Comment:
nit: You could do the following to avoid having to put the list again into
the map.
```
groupsByPartition
.computeIdAbsent(partition, __ -> new ArrayList())
.put(groupId);
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat(
);
}
+ /**
+ * Handles a GroupDelete request.
+ *
+ * @param context The request context.
+ * @param groupIds The groupIds of the groups to be deleted
+ * @return A Result containing the
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> deleteGroups(
+ RequestContext context,
+ List<String> groupIds
+ ) throws ApiException {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ final List<Record> records = new ArrayList<>();
+
+ groupIds.forEach(groupId -> {
+ try {
+ groupMetadataManager.validateGroupDelete(groupId);
+
Review Comment:
nit: We can remove this empty line.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -705,9 +744,39 @@ public CompletableFuture<OffsetDeleteResponseData>
deleteOffsets(
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ if (!isGroupIdNotEmpty(request.groupId())) {
+ return CompletableFuture.completedFuture(new
OffsetDeleteResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ );
+ }
+
+ return runtime.scheduleWriteOperation(
+ "delete-offset",
+ topicPartitionFor(request.groupId()),
+ coordinator -> coordinator.deleteOffsets(context, request)
+ ).exceptionally(exception -> {
Review Comment:
It is interesting to point out that, in the current implementation, all
these errors are swallowed. This is definitely not ideal because it tells to
the user that the deletion is successful even if was not. Should we apply the
same error handling to the deleteGroups?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3072,39 @@ private void removeCurrentMemberFromGenericGroup(
group.remove(member.memberId());
}
+ /**
+ * Handles a GroupDelete request.
+ *
+ * @param context The request context.
+ * @param groupId The group id of the group to be deleted.
+ * @return A Result containing the
DeleteGroupsResponseData.DeletableGroupResult response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResult,
Record> groupDelete(
+ RequestContext context,
+ String groupId
Review Comment:
nit: The indentation is incorrect.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param context The request context.
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ RequestContext context,
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData();
+ try {
Review Comment:
I think that the try..catch is not needed here because we handle the
exceptions in the group coordinator service, no?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -592,6 +607,45 @@ public void validateOffsetFetch(
validateMemberEpoch(memberEpoch, member.memberEpoch());
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() throws GroupIdNotFoundException {
+ if (state() == ConsumerGroupState.DEAD) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ }
Review Comment:
I think that a consumer group will actually never transition to Dead. We
could actually remove this state.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -592,6 +607,45 @@ public void validateOffsetFetch(
validateMemberEpoch(memberEpoch, member.memberEpoch());
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() throws GroupIdNotFoundException {
+ if (state() == ConsumerGroupState.DEAD) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ }
+ }
+
+ /**
+ * Validates the GroupDelete request.
+ */
+ @Override
+ public void validateGroupDelete() throws ApiException {
+ if (state() == ConsumerGroupState.DEAD) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ } else if (state() == ConsumerGroupState.STABLE
+ || state() == ConsumerGroupState.ASSIGNING
+ || state() == ConsumerGroupState.RECONCILING) {
+ throw Errors.NON_EMPTY_GROUP.exception();
+ }
+
+ // We avoid writing the tombstone when the generationId is 0, since
this group is only using
+ // Kafka for offset storage.
+ if (groupEpoch() <= 0) {
+ throw Errors.UNKNOWN_SERVER_ERROR.exception();
+ }
+ }
+
+ /**
+ * Creates a GroupMetadata tombstone.
+ *
+ * @return The record.
+ */
+ public Record createMetadataTombstoneRecord() {
+ return RecordHelpers.newGroupEpochTombstoneRecord(groupId());
Review Comment:
As mentioned earlier, we have to generate other tombstones.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -592,6 +607,45 @@ public void validateOffsetFetch(
validateMemberEpoch(memberEpoch, member.memberEpoch());
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() throws GroupIdNotFoundException {
+ if (state() == ConsumerGroupState.DEAD) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ }
+ }
+
+ /**
+ * Validates the GroupDelete request.
+ */
+ @Override
+ public void validateGroupDelete() throws ApiException {
+ if (state() == ConsumerGroupState.DEAD) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ } else if (state() == ConsumerGroupState.STABLE
+ || state() == ConsumerGroupState.ASSIGNING
+ || state() == ConsumerGroupState.RECONCILING) {
+ throw Errors.NON_EMPTY_GROUP.exception();
+ }
+
+ // We avoid writing the tombstone when the generationId is 0, since
this group is only using
+ // Kafka for offset storage.
+ if (groupEpoch() <= 0) {
+ throw Errors.UNKNOWN_SERVER_ERROR.exception();
+ }
Review Comment:
This does not seem correct to me because this exception does not apply to
consumer groups.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +525,46 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+ groupIds.forEach(groupId -> {
+ final int partition = partitionFor(groupId);
+ final List<String> groupList =
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+ groupList.add(groupId);
+ groupsByPartition.put(partition, groupList);
+ });
+
+ final
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
futures = new ArrayList<>();
+ groupsByPartition.forEach((partition, groupList) -> {
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ runtime.scheduleWriteOperation(
+ "delete-group",
+ new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
partition),
+ coordinator -> coordinator.deleteGroups(context, groupList)
+ ).exceptionally(exception -> {
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ groupIds.forEach(groupId -> {
+ resultCollection.add(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+
.setErrorCode(Errors.forException(exception).code())
+ );
+ });
+ return resultCollection;
+ });
+
+ futures.add(future);
+ });
+
+ final CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ return allFutures.thenApply(v -> {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection res
= new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ for
(CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future : futures) {
+ DeleteGroupsResponseData.DeletableGroupResultCollection result
= future.join();
+ res.addAll(result);
Review Comment:
nit: `res.addAll(future.join())` to reduce the code?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +525,46 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+ groupIds.forEach(groupId -> {
+ final int partition = partitionFor(groupId);
+ final List<String> groupList =
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+ groupList.add(groupId);
+ groupsByPartition.put(partition, groupList);
+ });
+
+ final
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
futures = new ArrayList<>();
Review Comment:
nit: We could specify the size of the array when we allocate it.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat(
);
}
+ /**
+ * Handles a GroupDelete request.
+ *
+ * @param context The request context.
+ * @param groupIds The groupIds of the groups to be deleted
+ * @return A Result containing the
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> deleteGroups(
+ RequestContext context,
+ List<String> groupIds
+ ) throws ApiException {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ final List<Record> records = new ArrayList<>();
+
+ groupIds.forEach(groupId -> {
+ try {
+ groupMetadataManager.validateGroupDelete(groupId);
+
+
offsetMetadataManager.populateRecordListToDeleteAllOffsets(context, groupId,
records);
+ final
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResult, Record>
deleteGroupCoordinatorResult =
+ groupMetadataManager.groupDelete(context, groupId);
Review Comment:
The CoordinatorResult is a bit annoying here. How about passing `records` to
the method as well? Then we could construct the response here. We could also
remove the context if it is not needed.
How about naming it `deleteGroup` to be consistent with `deleteOffsets`?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat(
);
}
+ /**
+ * Handles a GroupDelete request.
+ *
+ * @param context The request context.
+ * @param groupIds The groupIds of the groups to be deleted
+ * @return A Result containing the
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> deleteGroups(
+ RequestContext context,
+ List<String> groupIds
+ ) throws ApiException {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ final List<Record> records = new ArrayList<>();
+
+ groupIds.forEach(groupId -> {
+ try {
+ groupMetadataManager.validateGroupDelete(groupId);
+
+
offsetMetadataManager.populateRecordListToDeleteAllOffsets(context, groupId,
records);
+ final
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResult, Record>
deleteGroupCoordinatorResult =
+ groupMetadataManager.groupDelete(context, groupId);
+ records.addAll(deleteGroupCoordinatorResult.records());
+
+ resultCollection.add(deleteGroupCoordinatorResult.response());
+ } catch (ApiException exception) {
Review Comment:
I have a question regarding the error handling. Could `groupDelete` thrown
an exception? If it can, we would need to handle records a bit differently
because we don't want to delete the offsets if the group cannot be delete. The
operation should be atomic.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat(
);
}
+ /**
+ * Handles a GroupDelete request.
+ *
+ * @param context The request context.
+ * @param groupIds The groupIds of the groups to be deleted
+ * @return A Result containing the
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> deleteGroups(
+ RequestContext context,
+ List<String> groupIds
+ ) throws ApiException {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ final List<Record> records = new ArrayList<>();
+
+ groupIds.forEach(groupId -> {
+ try {
+ groupMetadataManager.validateGroupDelete(groupId);
+
+
offsetMetadataManager.populateRecordListToDeleteAllOffsets(context, groupId,
records);
Review Comment:
nit: `deleteAllOffsets`? I also wonder if the context is required. If not,
we could remove it.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat(
);
}
+ /**
+ * Handles a GroupDelete request.
+ *
+ * @param context The request context.
+ * @param groupIds The groupIds of the groups to be deleted
+ * @return A Result containing the
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> deleteGroups(
+ RequestContext context,
+ List<String> groupIds
+ ) throws ApiException {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ final List<Record> records = new ArrayList<>();
+
+ groupIds.forEach(groupId -> {
+ try {
+ groupMetadataManager.validateGroupDelete(groupId);
+
+
offsetMetadataManager.populateRecordListToDeleteAllOffsets(context, groupId,
records);
+ final
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResult, Record>
deleteGroupCoordinatorResult =
+ groupMetadataManager.groupDelete(context, groupId);
+ records.addAll(deleteGroupCoordinatorResult.records());
+
+ resultCollection.add(deleteGroupCoordinatorResult.response());
+ } catch (ApiException exception) {
+ resultCollection.add(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ .setErrorCode(Errors.forException(exception).code())
+ );
+ }
+ });
+
+ return new CoordinatorResult<>(records, resultCollection);
+
Review Comment:
nit: We could remove this empty line?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -242,6 +244,19 @@ private void validateOffsetFetch(
);
}
+ /**
+ * Validates an OffsetDelete request.
+ *
+ * @param request The actual request.
+ */
+ private Group validateOffsetDelete(
+ OffsetDeleteRequestData request
Review Comment:
nit: Indentation is incorrect.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3072,39 @@ private void removeCurrentMemberFromGenericGroup(
group.remove(member.memberId());
}
+ /**
+ * Handles a GroupDelete request.
+ *
+ * @param context The request context.
+ * @param groupId The group id of the group to be deleted.
+ * @return A Result containing the
DeleteGroupsResponseData.DeletableGroupResult response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResult,
Record> groupDelete(
+ RequestContext context,
+ String groupId
+ ) throws ApiException {
+ DeleteGroupsResponseData.DeletableGroupResult result =
+ new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId);
+
+ final List<Record> records = new ArrayList<>();
+ records.add(group(groupId).createMetadataTombstoneRecord());
+
+ return new CoordinatorResult<>(records, result);
+ }
+
+ void validateGroupDelete(String groupId) throws ApiException {
+
Review Comment:
nit: Let's remove this empty line and add javadoc to the method.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param context The request context.
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ RequestContext context,
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData();
+ try {
+ Group group = validateOffsetDelete(request);
Review Comment:
Let's do the validation before allocating response, records, etc. We don't
have to allocate them if the request is invalid. `group` could also be `final`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param context The request context.
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ RequestContext context,
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData();
Review Comment:
nit: final?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param context The request context.
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ RequestContext context,
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData();
+ try {
+ Group group = validateOffsetDelete(request);
+
+ request.topics().forEach(topic -> {
+ final
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ final boolean subscribedToTopic =
group.isSubscribedToTopic(topic.name());
+
+ topic.partitions().forEach(partition -> {
+ OffsetDeleteResponseData.OffsetDeleteResponsePartition
responsePartition =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+ if (subscribedToTopic) {
+ responsePartition =
responsePartition.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code());
+ } else {
+
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
Review Comment:
I wonder if we need to verify if there is actually an offset for the
topic/partition. We don't need to write a tombstone if there is not. What do
you think?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param context The request context.
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ RequestContext context,
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData();
+ try {
+ Group group = validateOffsetDelete(request);
+
+ request.topics().forEach(topic -> {
+ final
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ final boolean subscribedToTopic =
group.isSubscribedToTopic(topic.name());
+
+ topic.partitions().forEach(partition -> {
+ OffsetDeleteResponseData.OffsetDeleteResponsePartition
responsePartition =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+ if (subscribedToTopic) {
+ responsePartition =
responsePartition.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code());
+ } else {
+
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+ request.groupId(),
+ topic.name(),
+ partition.partitionIndex()
+ ));
+ }
+ responsePartitionCollection.add(responsePartition);
+ });
+
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopic
responseTopic =
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection);
+ responseTopicCollection.add(responseTopic);
+ });
+ response = response.setTopics(responseTopicCollection);
Review Comment:
nit: `response = ` is not needed here as `setTopics` mutates the response
directly.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -592,6 +607,45 @@ public void validateOffsetFetch(
validateMemberEpoch(memberEpoch, member.memberEpoch());
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() throws GroupIdNotFoundException {
+ if (state() == ConsumerGroupState.DEAD) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ }
+ }
+
+ /**
+ * Validates the GroupDelete request.
+ */
+ @Override
+ public void validateGroupDelete() throws ApiException {
+ if (state() == ConsumerGroupState.DEAD) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ } else if (state() == ConsumerGroupState.STABLE
+ || state() == ConsumerGroupState.ASSIGNING
+ || state() == ConsumerGroupState.RECONCILING) {
+ throw Errors.NON_EMPTY_GROUP.exception();
+ }
Review Comment:
I wonder if using a switch would be better here. What do you think?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -592,6 +607,45 @@ public void validateOffsetFetch(
validateMemberEpoch(memberEpoch, member.memberEpoch());
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() throws GroupIdNotFoundException {
+ if (state() == ConsumerGroupState.DEAD) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ }
+ }
+
+ /**
+ * Validates the GroupDelete request.
+ */
+ @Override
+ public void validateGroupDelete() throws ApiException {
+ if (state() == ConsumerGroupState.DEAD) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
Review Comment:
ditto.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -849,6 +853,46 @@ public void validateOffsetFetch(
}
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() throws GroupIdNotFoundException {
+ if (isInState(DEAD)) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ }
+ }
+
+ /**
+ * Validates the GroupDelete request.
+ */
+ @Override
+ public void validateGroupDelete() throws ApiException {
+ if (isInState(DEAD)) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ } else if (isInState(STABLE)
+ || isInState(PREPARING_REBALANCE)
+ || isInState(COMPLETING_REBALANCE)) {
+ throw Errors.NON_EMPTY_GROUP.exception();
+ }
+
+ // We avoid writing the tombstone when the generationId is 0, since
this group is only using
+ // Kafka for offset storage.
+ if (generationId() <= 0) {
+ throw Errors.UNKNOWN_SERVER_ERROR.exception();
Review Comment:
Throwing an exception does not seem to be the right approach to me because
we still want to delete the group and the exception will stop the process. My
understanding is that we could just skip generating the tombstone if the
generation <= 0.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]