dajac commented on code in PR #14462:
URL: https://github.com/apache/kafka/pull/14462#discussion_r1340758791
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -505,9 +508,52 @@ public
CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> descri
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final
List<CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>>>
futures =
+ new ArrayList<>(groupIds.size());
+ final Map<TopicPartition, List<String>> groupsByTopicPartition = new
HashMap<>();
+ groupIds.forEach(groupId -> {
+ // For backwards compatibility, we support DescribeGroups for the
empty group id.
+ if (groupId == null) {
+
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+ new DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ )));
+ } else {
+ final TopicPartition topicPartition =
topicPartitionFor(groupId);
+ groupsByTopicPartition
+ .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+ .add(groupId);
+ }
+ });
+
+ groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+ CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>>
future =
+ runtime.scheduleReadOperation(
+ "describe-groups",
+ topicPartition,
+ (coordinator, __) -> coordinator.describeGroups(context,
groupList)
Review Comment:
@dongnuo123 We cannot ignore the second argument received here. Otherwise,
we would expose uncommitted state. Take a look at how we did it for the list
group api.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]