dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1331125886
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,38 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+ for (String groupId : groupIds) {
+ final int partition = partitionFor(groupId);
+ final List<String> groupList =
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+ groupList.add(groupId);
+ groupsByPartition.put(partition, groupList);
+ }
+
+ final
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
futures = new ArrayList<>();
+ for (Map.Entry<Integer, List<String>> entry :
groupsByPartition.entrySet()) {
+ int partition = entry.getKey();
+ List<String> groupList = entry.getValue();
Review Comment:
nit: You could use `foreach` which is a bit more concise.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,38 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+ for (String groupId : groupIds) {
+ final int partition = partitionFor(groupId);
+ final List<String> groupList =
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+ groupList.add(groupId);
+ groupsByPartition.put(partition, groupList);
+ }
+
+ final
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
futures = new ArrayList<>();
+ for (Map.Entry<Integer, List<String>> entry :
groupsByPartition.entrySet()) {
+ int partition = entry.getKey();
+ List<String> groupList = entry.getValue();
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ runtime.scheduleWriteOperation("delete-group",
+ new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition),
+ coordinator -> coordinator.deleteGroups(context,
groupList));
+ futures.add(future);
+ }
+
+ final CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ return allFutures.thenApply(v -> {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection res
= new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ for
(CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future : futures) {
+ try {
+ DeleteGroupsResponseData.DeletableGroupResultCollection
result = future.get();
Review Comment:
It may be better to use `join` instead of `get`. I think that you would be
able to remove the try..catch if you use `join`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,38 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+ for (String groupId : groupIds) {
+ final int partition = partitionFor(groupId);
+ final List<String> groupList =
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+ groupList.add(groupId);
+ groupsByPartition.put(partition, groupList);
+ }
+
+ final
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
futures = new ArrayList<>();
+ for (Map.Entry<Integer, List<String>> entry :
groupsByPartition.entrySet()) {
+ int partition = entry.getKey();
+ List<String> groupList = entry.getValue();
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ runtime.scheduleWriteOperation("delete-group",
+ new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition),
+ coordinator -> coordinator.deleteGroups(context,
groupList));
+ futures.add(future);
+ }
+
+ final CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
Review Comment:
Let's assume that one of the write operation fails with
`COORDINATOR_LOAD_IN_PROGRESS`, this would result in failing `allFutures` even
though some write operations may have been successful. It seems to me that we
should handle exceptions for each write operation future before we combine
them, no?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,38 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+ for (String groupId : groupIds) {
+ final int partition = partitionFor(groupId);
+ final List<String> groupList =
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+ groupList.add(groupId);
+ groupsByPartition.put(partition, groupList);
+ }
+
+ final
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
futures = new ArrayList<>();
+ for (Map.Entry<Integer, List<String>> entry :
groupsByPartition.entrySet()) {
+ int partition = entry.getKey();
+ List<String> groupList = entry.getValue();
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ runtime.scheduleWriteOperation("delete-group",
+ new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition),
+ coordinator -> coordinator.deleteGroups(context,
groupList));
Review Comment:
nit: Let's put `delete-group` on a new line as well. Could you also ensure
that the format conforms to the existing code? e.g. where the closing
parenthesis is, the indentation (4 spaces), etc.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,98 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param context The request context.
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ RequestContext context,
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData();
+ try {
+ Group group = validateOffsetDelete(request);
+
+ request.topics().forEach(topic -> {
+ final
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ final boolean subscribedToTopic =
group.isSubscribedToTopic(topic.name());
+
+ topic.partitions().forEach(partition -> {
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+ request.groupId(),
+ topic.name(),
+ partition.partitionIndex()
+ ));
+
+ OffsetDeleteResponseData.OffsetDeleteResponsePartition
responsePartition =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+ if (subscribedToTopic) {
+ responsePartition =
responsePartition.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code());
+ }
+ responsePartitionCollection.add(responsePartition);
+ });
+
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopic
responseTopic =
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection);
+ responseTopicCollection.add(responseTopic);
+ });
+ response = response.setTopics(responseTopicCollection);
+ } catch (ApiException ex) {
+ response = response.setErrorCode(Errors.forException(ex).code());
+ }
+ return new CoordinatorResult<>(records, response);
+ }
+
+ /**
+ * Deletes all the offsets of the given groups to handle a GroupDelete
request.
+ * Validations are done in groupDelete method in GroupMetadataManager.
+ *
+ * @param context The request context.
+ * @param groupIds The list of group ids of the given groups.
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record>
deleteAllOffsets(
Review Comment:
As I said earlier, I think that returning CoordinatorResult is not
appropriate here because we don't need a response in this case. We basically
build for the response to ignore it right after.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,29 @@ public HeartbeatResponseData genericGroupHeartbeat(
);
}
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> deleteGroups(
+ RequestContext context,
+ List<String> groupIds
+ ) throws ApiException {
+
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> groupDeleteCoordinatorResult =
+ groupMetadataManager.groupDelete(context, groupIds);
+
+ List<String> validGroupIds = new ArrayList<>();
+ for (DeleteGroupsResponseData.DeletableGroupResult result :
groupDeleteCoordinatorResult.response()) {
+ if (result.errorCode() == Errors.NONE.code()) {
+ validGroupIds.add(result.groupId());
+ }
+ }
+
+ CoordinatorResult<OffsetDeleteResponseData, Record>
deleteOffsetCoordinatorResult =
+ offsetMetadataManager.deleteAllOffsets(context, validGroupIds);
+
+ final List<Record> records = groupDeleteCoordinatorResult.records();
+ records.addAll(deleteOffsetCoordinatorResult.records());
+ return new CoordinatorResult<>(records,
groupDeleteCoordinatorResult.response());
Review Comment:
I have a few comments regarding this piece of code:
1. I think that we should write the tombstones for the offsets before the
ones for the group.
2. It is a bit strange to return a CoordinatorResult from `deleteAllOffsets`
and to ignore it. It would be better to pass the list of records to the method
and to let the method populate it if the deletion is accepted. I would also
remove the response as we don't need it.
3. The `validGroupIds` is a bit weird. How about iterating over the group
ids here? Then, you can call the various methods from the manages to validate,
delete offsets and finally delete the group. If there is an error, you can
directly populate the response with it.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3072,41 @@ private void removeCurrentMemberFromGenericGroup(
group.remove(member.memberId());
}
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> groupDelete(
+ RequestContext context,
+ List<String> groupIds
+ ) throws ApiException {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ final List<Record> records = new ArrayList<>();
+
+ groupIds.forEach(groupId -> {
+ DeleteGroupsResponseData.DeletableGroupResult result =
+ new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId);
+ try {
+ validateGroupDelete(groupId);
+
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
Review Comment:
`newGroupMetadataTombstoneRecord` only works for generic groups. For
consumer groups, we need to write other tombstones.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -705,9 +737,15 @@ public CompletableFuture<OffsetDeleteResponseData>
deleteOffsets(
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ if (!isGroupIdNotEmpty(request.groupId())) {
+ return
FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception());
+ }
+
+ return runtime.scheduleWriteOperation(
+ "delete-offset",
+ topicPartitionFor(request.groupId()),
+ coordinator -> coordinator.deleteOffsets(context, request)
Review Comment:
nit: Indentation.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,29 @@ public HeartbeatResponseData genericGroupHeartbeat(
);
}
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> deleteGroups(
+ RequestContext context,
+ List<String> groupIds
Review Comment:
nit: Indentation. There are other cases in this PR. I won't mention them all.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,98 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param context The request context.
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ RequestContext context,
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData();
+ try {
+ Group group = validateOffsetDelete(request);
+
+ request.topics().forEach(topic -> {
+ final
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ final boolean subscribedToTopic =
group.isSubscribedToTopic(topic.name());
+
+ topic.partitions().forEach(partition -> {
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+ request.groupId(),
+ topic.name(),
+ partition.partitionIndex()
+ ));
Review Comment:
We should not write the record if subscribedToTopic is true because it will
effectively delete the offset.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]