dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1314944364
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -306,6 +308,19 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return offsetMetadataManager.commitOffset(context, request);
}
+ /**
+ * Handles a ListGroups request.
+ *
+ * @param statesFilter The states of the groups we want to list. If empty
all groups are returned with their state.
+ * @return A Result containing the ListGroupsResponseData response
+ */
+ public ListGroupsResponseData listGroups(
+ List<String> statesFilter,
+ long committedOffset
+ ) throws ApiException {
+ return new
ListGroupsResponseData().setGroups(groupMetadataManager.listGroups(statesFilter,
committedOffset));
+ }
Review Comment:
I sill think that we should rather return the list of groups here and create
`ListGroupsResponseData` one level up.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +432,51 @@ public CompletableFuture<ListGroupsResponseData>
listGroups(
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ List<CompletableFuture<ListGroupsResponseData>> futures = new
ArrayList<>();
+ for (TopicPartition tp : runtime.partitions()) {
+ futures.add(runtime.scheduleReadOperation(
+ "list-groups",
+ tp,
+ (coordinator, lastCommittedOffset) ->
coordinator.listGroups(request.statesFilter(), lastCommittedOffset)
+ ).exceptionally(exception -> {
+ if (!(exception instanceof KafkaException)) {
+ log.error("ListGroups request {} hit an unexpected
exception: {}",
+ request, exception.getMessage());
+ throw new RuntimeException(exception);
+ }
+ if (exception instanceof CoordinatorLoadInProgressException) {
+ throw new RuntimeException(exception);
+ } else if (exception instanceof NotCoordinatorException) {
+ log.warn("ListGroups request {} hit a
NotCoordinatorException exception: {}",
+ request, exception.getMessage());
+ return new
ListGroupsResponseData().setGroups(Collections.emptyList());
+ } else {
+ return new
ListGroupsResponseData().setErrorCode(Errors.forException(exception).code());
+ }
+ }));
+ }
+ CompletableFuture<ListGroupsResponseData> responseFuture = new
CompletableFuture<>();
+ List<ListGroupsResponseData.ListedGroup> listedGroups = new
ArrayList<>();
+ AtomicInteger succeedFutureCount = new AtomicInteger();
+ FutureUtils.drainFutures(futures, (data, t) -> {
+ synchronized (runtime) {
+ if (t != null) {
+ responseFuture.completeExceptionally(new
UnknownServerException(t.getMessage()));
+ } else {
+ if (data.errorCode() != Errors.NONE.code()) {
+ if (!responseFuture.isDone()) {
+ responseFuture.complete(data);
+ }
+ } else {
+ listedGroups.addAll(data.groups());
+ if (succeedFutureCount.addAndGet(1) ==
runtime.partitions().size()) {
+ responseFuture.complete(new
ListGroupsResponseData().setGroups(listedGroups));
+ }
+ }
+ }
+ }
+ });
+ return responseFuture;
Review Comment:
There are a few issues with this code.
1. Synchronising on `runtime` will create lock contention across all the
callers of `listGroups`. We should rather use a local variable.
2. The error handling seems error prone to me. For instance,
`NotCoordinatorException` exceptions are turned into `RuntimeException`
exceptions and then turned into `UnknownServerException` if I understood it
correctly. We lose the semantic along the way.
I think that we could take your idea further and combine the two main steps
into one. I am thinking about something like this:
```
final List<TopicPartition> partitions = new
ArrayList<>(runtime.partitions());
final CompletableFuture<ListGroupsResponseData> future = new
CompletableFuture<>();
final List<ListGroupsResponseData.ListedGroup> results = new
ArrayList<>();
final AtomicInteger cnt = new AtomicInteger(partitions.size());
for (TopicPartition partition : partitions) {
runtime.scheduleReadOperation(
"list-group",
partition,
(coordinator, lastCommittedOffset) ->
coordinator.listGroups(request.statesFilter(), lastCommittedOffset)
).handle((groups, exception) -> {
if (exception == null) {
synchronized (results) {
results.addAll(groups);
}
} else {
if (!(exception instanceof NotCoordinatorException)) {
future.complete(new
ListGroupsResponseData().setErrorCode(Errors.forException(exception).code()));
}
}
if (cnt.decrementAndGet() == 0 && !future.isDone()) {
future.complete(new
ListGroupsResponseData().setGroups(results));
}
return null;
});
}
```
What do you think?
Note that I have not tested this code at all. I just wrote it here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -420,6 +422,23 @@ public Group group(String groupId, long committedOffset)
throws GroupIdNotFoundE
return group;
}
+ /**
+ * @return The GenericGroup List filtered by statesFilter and
committedOffset.
+ */
+ public List<ListGroupsResponseData.ListedGroup> listGroups(List<String>
statesFilter, long committedOffset) {
+ Stream<Group> groupStream = groups.values(committedOffset).stream();
+ if (!statesFilter.isEmpty()) {
+ groupStream = groupStream.filter(group -> {
+ if (group instanceof ConsumerGroup) {
+ return statesFilter.contains(((ConsumerGroup)
group).stateAsString(committedOffset));
+ } else {
+ return statesFilter.contains(group.stateAsString());
+ }
Review Comment:
This is weird. I would rather push `stateAsString(committedOffset)` to the
`Group` interface and use it.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData>
listGroups(
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ List<CompletableFuture<ListGroupsResponseData>> futures = new
java.util.ArrayList<>(Collections.emptyList());
+ for (TopicPartition tp : runtime.partitions()) {
+ futures.add(runtime.scheduleReadOperation(
+ "list_groups",
+ tp,
+ (coordinator, lastCommittedOffset) ->
coordinator.listGroups(context, request, lastCommittedOffset)
+ ).exceptionally(exception -> {
+ if (!(exception instanceof KafkaException)) {
+ log.error("ListGroups request {} hit an unexpected
exception: {}",
+ request, exception.getMessage());
+ }
+ return new ListGroupsResponseData()
+ .setErrorCode(Errors.forException(exception).code());
+ }));
+ }
+ CompletableFuture<ListGroupsResponseData> responseFuture = new
CompletableFuture<>();
+ List<ListGroupsResponseData.ListedGroup> listedGroups = new
ArrayList<>();
+ futures.forEach(CompletableFuture::join);
+ for (CompletableFuture<ListGroupsResponseData> future : futures) {
+ try {
+ ListGroupsResponseData data = future.get();
+ if (data.errorCode() != Errors.NONE.code()) {
+ responseFuture.complete(data);
+ return responseFuture;
+ }
+ listedGroups.addAll(future.get().groups());
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("ListGroups request {} hit an unexpected exception:
{}",
+ request, e.getMessage());
+ if (!responseFuture.isDone()) {
+ responseFuture.complete(new ListGroupsResponseData()
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()));
+ return responseFuture;
+ }
+ }
+ }
Review Comment:
I was thinking about something like this:
```
public static <T> CompletableFuture<List<T>>
allOf(List<CompletableFuture<T>> futures) {
return CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(__ -> {
final List<T> results = new ArrayList<>(futures.size());
for (CompletableFuture<T> future : futures) {
results.add(future.join());
}
return results;
});
}
```
I am actually not sure that it would help here, see my previous comment.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -179,6 +181,13 @@ public GroupType type() {
public String stateAsString() {
return state.get().toString();
}
+ /**
Review Comment:
nit: Let's put an empty line before this one.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -747,4 +756,15 @@ private static Integer decValue(String key, Integer value)
{
private static Integer incValue(String key, Integer value) {
return value == null ? 1 : value + 1;
}
+
+ /**
+ * @return the group formatted as a list group response.
+ */
+ public ListGroupsResponseData.ListedGroup asListedGroup(long
committedOffset) {
+ return new ListGroupsResponseData.ListedGroup()
+ .setGroupId(groupId)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setGroupState(state.get(committedOffset).toString());
+ }
Review Comment:
nit: Could we move this method to before all the private method? For
instance, we could put right after the new `stateAsString` method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]