dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1302279096
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -426,9 +429,43 @@ public CompletableFuture<ListGroupsResponseData>
listGroups(
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ List<CompletableFuture<ListGroupsResponseData>> futures = new
java.util.ArrayList<>(Collections.emptyList());
+ for (int i = 0; i < numPartitions; i++) {
Review Comment:
This seems to be inefficient because the coordinator may not be responsible
for all the partitions. I thought that we could use
`CoordinatorRuntime#partitions` to get the list of registered partitions. Have
you considered this?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -272,6 +274,21 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return offsetMetadataManager.commitOffset(context, request);
}
+ /**
+ * Handles a ListGroups request.
+ *
+ * @param context The request context.
+ * @param request The ListGroups request.
+ *
+ * @return A Result containing the ListGroupsResponseData response
+ */
+ public ListGroupsResponseData listGroups(
+ RequestContext context,
+ ListGroupsRequestData request
Review Comment:
nit: This should be indented with four spaces.
##########
clients/src/main/resources/common/message/ListGroupsRequest.json:
##########
@@ -23,11 +23,15 @@
// Version 3 is the first flexible version.
//
// Version 4 adds the StatesFilter field (KIP-518).
- "validVersions": "0-4",
+ //
+ // Version 5 adds the TypesFilter field (KIP-848).
+ "validVersions": "0-5",
"flexibleVersions": "3+",
"fields": [
{ "name": "StatesFilter", "type": "[]string", "versions": "4+",
"about": "The states of the groups we want to list. If empty all groups
are returned with their state."
- }
+ },
+ { "name": "TypesFilter", "type": "[]string", "versions": "5+",
+ "about": "The types of the groups we want to list. If empty all groups
are returned" }
Review Comment:
I would rather prefer to do this in a second PR because this change impacts
both the new and the old group coordinators. Would it be possible?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value)
{
private static Integer incValue(String key, Integer value) {
return value == null ? 1 : value + 1;
}
+
+ /**
+ * @return the group formatted as a list group response.
+ */
+ public ListGroupsResponseData.ListedGroup asListedGroup() {
+ return new ListGroupsResponseData.ListedGroup()
+ .setGroupId(groupId)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setGroupState(state.toString());
Review Comment:
nit: This should be indented with four spaces. I have seen this in other
places in the code but I am not going to comment them all. I let you have a
look.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value)
{
private static Integer incValue(String key, Integer value) {
return value == null ? 1 : value + 1;
}
+
+ /**
+ * @return the group formatted as a list group response.
+ */
+ public ListGroupsResponseData.ListedGroup asListedGroup() {
Review Comment:
Don't we need to also implement this for `GenericGroup`?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -426,9 +429,43 @@ public CompletableFuture<ListGroupsResponseData>
listGroups(
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ List<CompletableFuture<ListGroupsResponseData>> futures = new
java.util.ArrayList<>(Collections.emptyList());
+ for (int i = 0; i < numPartitions; i++) {
+ futures.add(runtime.scheduleReadOperation("list_groups",
+ new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i),
+ (coordinator, __) -> coordinator.listGroups(context,
request)
Review Comment:
We need to use the second parameters `__` as pass it down to `listGroups`.
For the context, the second parameter is the last committed offsets. We should
list the groups based on it. Otherwise, we would return uncommitted changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]