dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1312918434
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData>
listGroups(
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ List<CompletableFuture<ListGroupsResponseData>> futures = new
java.util.ArrayList<>(Collections.emptyList());
+ for (TopicPartition tp : runtime.partitions()) {
+ futures.add(runtime.scheduleReadOperation(
+ "list_groups",
Review Comment:
nit: Let's use `list-groups` to be consistent with the existing names.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData>
listGroups(
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ List<CompletableFuture<ListGroupsResponseData>> futures = new
java.util.ArrayList<>(Collections.emptyList());
Review Comment:
nit: Let's remove `java.util` and `Collections.emptyList()` as they are not
necessary.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData>
listGroups(
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ List<CompletableFuture<ListGroupsResponseData>> futures = new
java.util.ArrayList<>(Collections.emptyList());
+ for (TopicPartition tp : runtime.partitions()) {
+ futures.add(runtime.scheduleReadOperation(
+ "list_groups",
+ tp,
+ (coordinator, lastCommittedOffset) ->
coordinator.listGroups(context, request, lastCommittedOffset)
+ ).exceptionally(exception -> {
+ if (!(exception instanceof KafkaException)) {
+ log.error("ListGroups request {} hit an unexpected
exception: {}",
+ request, exception.getMessage());
+ }
+ return new ListGroupsResponseData()
+ .setErrorCode(Errors.forException(exception).code());
+ }));
Review Comment:
I think that we need to think a little more about the different errors that
we could get here. My understanding is that we fail the entire requests in two
cases: 1) at least one partition is loading; and 2) there is an unexpected
error (non KafkaException).
Here, we could get the following errors:
- NotCoordinatorException if `tp` is no longer active, failed, etc. In this
case, we actually want to return an empty lit of groups.
- CoordinatorLoadingException if `tp` is being loaded. In this case, we want
to fail the entire request.
- Unexpected Exception. In this case, we also want to fail the entire
request.
Knowing this, we should explicitly handle the NotCoordinatorException case
here. For the other cases, would it be possible to re-throw the exception?
It would be great if you could also add a few unit tests for this.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -747,4 +749,15 @@ private static Integer decValue(String key, Integer value)
{
private static Integer incValue(String key, Integer value) {
return value == null ? 1 : value + 1;
}
+
+ /**
+ * @return the group formatted as a list group response.
+ */
+ public ListGroupsResponseData.ListedGroup asListedGroup() {
+ return new ListGroupsResponseData.ListedGroup()
+ .setGroupId(groupId)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setGroupState(state.toString());
+ }
Review Comment:
Let's please add a unit test for this method.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -309,6 +311,21 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return offsetMetadataManager.commitOffset(context, request);
}
+ /**
+ * Handles a ListGroups request.
+ *
+ * @param context The request context.
+ * @param request The ListGroups request.
+ * @return A Result containing the ListGroupsResponseData response
+ */
+ public ListGroupsResponseData listGroups(
+ RequestContext context,
+ ListGroupsRequestData request,
+ long committedOffset
+ ) throws ApiException {
Review Comment:
I wonder if we should simplify the signature of this method. How about
taking a list of states and returning a list of ListedGroup? The full request
and the context are not really necessary in my opinion in this case.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset)
throws GroupIdNotFoundE
return group;
}
+ /**
+ * @return The GenericGroup List filtered by statesFilter or typesFilter.
+ */
+ public ListGroupsResponseData listGroups(ListGroupsRequestData request,
long committedOffset) {
+ Stream<Group> groupStream = groups.values(committedOffset).stream();
+ List<String> statesFilter = request.statesFilter();
+ if (!statesFilter.isEmpty()) {
+ groupStream = groupStream.filter(group ->
statesFilter.contains(group.stateAsString()));
+ }
+ return new
ListGroupsResponseData().setGroups(groupStream.map(Group::asListedGroup).collect(Collectors.toList()));
Review Comment:
In the ConsumerGroup case, the state is stored in a timeline data structure.
Hence, we need to pass the `committedOffset` to `stateAsString` and
`asListedGroup` in order to stay consistent.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset)
throws GroupIdNotFoundE
return group;
}
+ /**
+ * @return The GenericGroup List filtered by statesFilter or typesFilter.
Review Comment:
nit: Remove `typesFilter`. `statesFilter` -> `states`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData>
listGroups(
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ List<CompletableFuture<ListGroupsResponseData>> futures = new
java.util.ArrayList<>(Collections.emptyList());
+ for (TopicPartition tp : runtime.partitions()) {
+ futures.add(runtime.scheduleReadOperation(
+ "list_groups",
+ tp,
+ (coordinator, lastCommittedOffset) ->
coordinator.listGroups(context, request, lastCommittedOffset)
+ ).exceptionally(exception -> {
+ if (!(exception instanceof KafkaException)) {
+ log.error("ListGroups request {} hit an unexpected
exception: {}",
+ request, exception.getMessage());
+ }
+ return new ListGroupsResponseData()
+ .setErrorCode(Errors.forException(exception).code());
+ }));
+ }
+ CompletableFuture<ListGroupsResponseData> responseFuture = new
CompletableFuture<>();
+ List<ListGroupsResponseData.ListedGroup> listedGroups = new
ArrayList<>();
+ futures.forEach(CompletableFuture::join);
+ for (CompletableFuture<ListGroupsResponseData> future : futures) {
+ try {
+ ListGroupsResponseData data = future.get();
+ if (data.errorCode() != Errors.NONE.code()) {
+ responseFuture.complete(data);
+ return responseFuture;
+ }
+ listedGroups.addAll(future.get().groups());
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("ListGroups request {} hit an unexpected exception:
{}",
+ request, e.getMessage());
+ if (!responseFuture.isDone()) {
+ responseFuture.complete(new ListGroupsResponseData()
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()));
+ return responseFuture;
+ }
+ }
+ }
Review Comment:
Have you thought about writing an helper method to turn a list of
CompletableFutures to a CompletableFuture containing the list of the result?
That would be a nice building block that we could put in `FutureUtils`. If any
of the CompletableFutures would fail, the resulting CompletableFuture would be
failed as well with the same error. This would simplify the code here?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset)
throws GroupIdNotFoundE
return group;
}
+ /**
+ * @return The GenericGroup List filtered by statesFilter or typesFilter.
+ */
+ public ListGroupsResponseData listGroups(ListGroupsRequestData request,
long committedOffset) {
Review Comment:
Let's also add a unit test for this one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]