dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1314944364


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -306,6 +308,19 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return offsetMetadataManager.commitOffset(context, request);
     }
 
+    /**
+     * Handles a ListGroups request.
+     *
+     * @param statesFilter The states of the groups we want to list. If empty 
all groups are returned with their state.
+     * @return A Result containing the ListGroupsResponseData response
+     */
+    public ListGroupsResponseData listGroups(
+        List<String> statesFilter,
+        long committedOffset
+    ) throws ApiException {
+        return new 
ListGroupsResponseData().setGroups(groupMetadataManager.listGroups(statesFilter,
 committedOffset));
+    }

Review Comment:
   I sill think that we should rather return the list of groups here and create 
`ListGroupsResponseData` one level up.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +432,51 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        List<CompletableFuture<ListGroupsResponseData>> futures = new 
ArrayList<>();
+        for (TopicPartition tp : runtime.partitions()) {
+            futures.add(runtime.scheduleReadOperation(
+                "list-groups",
+                tp,
+                (coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), lastCommittedOffset)
+            ).exceptionally(exception -> {
+                if (!(exception instanceof KafkaException)) {
+                    log.error("ListGroups request {} hit an unexpected 
exception: {}",
+                        request, exception.getMessage());
+                    throw new RuntimeException(exception);
+                }
+                if (exception instanceof CoordinatorLoadInProgressException) {
+                    throw new RuntimeException(exception);
+                } else if (exception instanceof NotCoordinatorException) {
+                    log.warn("ListGroups request {} hit a 
NotCoordinatorException exception: {}",
+                        request, exception.getMessage());
+                    return new 
ListGroupsResponseData().setGroups(Collections.emptyList());
+                } else {
+                    return new 
ListGroupsResponseData().setErrorCode(Errors.forException(exception).code());
+                }
+            }));
+        }
+        CompletableFuture<ListGroupsResponseData> responseFuture = new 
CompletableFuture<>();
+        List<ListGroupsResponseData.ListedGroup> listedGroups = new 
ArrayList<>();
+        AtomicInteger succeedFutureCount = new AtomicInteger();
+        FutureUtils.drainFutures(futures, (data, t) -> {
+            synchronized (runtime) {
+                if (t != null) {
+                    responseFuture.completeExceptionally(new 
UnknownServerException(t.getMessage()));
+                } else {
+                    if (data.errorCode() != Errors.NONE.code()) {
+                        if (!responseFuture.isDone()) {
+                            responseFuture.complete(data);
+                        }
+                    } else {
+                        listedGroups.addAll(data.groups());
+                        if (succeedFutureCount.addAndGet(1) == 
runtime.partitions().size()) {
+                            responseFuture.complete(new 
ListGroupsResponseData().setGroups(listedGroups));
+                        }
+                    }
+                }
+            }
+        });
+        return responseFuture;

Review Comment:
   There are a few issues with this code.
   1. Synchronising on `runtime` will create lock contention across all the 
callers of `listGroups`. We should rather use a local variable.
   2. The error handling seems error prone to me. For instance, 
`NotCoordinatorException` exceptions are turned into `RuntimeException` 
exceptions and then turned into `UnknownServerException` if I understood it 
correctly. We lose the semantic along the way.
   
   I think that we could take your idea further and combine the two main steps 
into one. I am thinking about something like this:
   
   ```
           final List<TopicPartition> partitions = new 
ArrayList<>(runtime.partitions());
           final CompletableFuture<ListGroupsResponseData> future = new 
CompletableFuture<>();
           final List<ListGroupsResponseData.ListedGroup> results = new 
ArrayList<>();
           final AtomicInteger cnt = new AtomicInteger(partitions.size());
   
           for (TopicPartition partition : partitions) {
               runtime.scheduleReadOperation(
                   "list-group",
                   partition,
                   (coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), lastCommittedOffset)
               ).handle((groups, exception) -> {
                   if (exception == null) {
                       synchronized (results) {
                           results.addAll(groups);
                       }
                   } else {
                       if (!(exception instanceof NotCoordinatorException)) {
                           future.complete(new 
ListGroupsResponseData().setErrorCode(Errors.forException(exception).code()));
                       }
                   }
   
                   if (cnt.decrementAndGet() == 0 && !future.isDone()) {
                       future.complete(new 
ListGroupsResponseData().setGroups(results));
                   }
   
                   return null;
               });
           }
   ```
   
   What do you think?
   
   Note that I have not tested this code at all. I just wrote it here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -420,6 +422,23 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
         return group;
     }
 
+    /**
+     * @return The GenericGroup List filtered by statesFilter and 
committedOffset.
+     */
+    public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> 
statesFilter, long committedOffset) {
+        Stream<Group> groupStream = groups.values(committedOffset).stream();
+        if (!statesFilter.isEmpty()) {
+            groupStream = groupStream.filter(group -> {
+                if (group instanceof ConsumerGroup) {
+                    return statesFilter.contains(((ConsumerGroup) 
group).stateAsString(committedOffset));
+                } else {
+                    return statesFilter.contains(group.stateAsString());
+                }

Review Comment:
   This is weird. I would rather push `stateAsString(committedOffset)` to the 
`Group` interface and use it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        List<CompletableFuture<ListGroupsResponseData>> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+        for (TopicPartition tp : runtime.partitions()) {
+            futures.add(runtime.scheduleReadOperation(
+                "list_groups",
+                tp,
+                (coordinator, lastCommittedOffset) -> 
coordinator.listGroups(context, request, lastCommittedOffset)
+            ).exceptionally(exception -> {
+                if (!(exception instanceof KafkaException)) {
+                    log.error("ListGroups request {} hit an unexpected 
exception: {}",
+                        request, exception.getMessage());
+                }
+                return new ListGroupsResponseData()
+                    .setErrorCode(Errors.forException(exception).code());
+            }));
+        }
+        CompletableFuture<ListGroupsResponseData> responseFuture = new 
CompletableFuture<>();
+        List<ListGroupsResponseData.ListedGroup> listedGroups = new 
ArrayList<>();
+        futures.forEach(CompletableFuture::join);
+        for (CompletableFuture<ListGroupsResponseData> future : futures) {
+            try {
+                ListGroupsResponseData data = future.get();
+                if (data.errorCode() != Errors.NONE.code()) {
+                    responseFuture.complete(data);
+                    return responseFuture;
+                }
+                listedGroups.addAll(future.get().groups());
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("ListGroups request {} hit an unexpected exception: 
{}",
+                    request, e.getMessage());
+                if (!responseFuture.isDone()) {
+                    responseFuture.complete(new ListGroupsResponseData()
+                        .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()));
+                    return responseFuture;
+                }
+            }
+        }

Review Comment:
   I was thinking about something like this:
   
   ```
       public static <T> CompletableFuture<List<T>> 
allOf(List<CompletableFuture<T>> futures) {
           return CompletableFuture
               .allOf(futures.toArray(new CompletableFuture[0]))
               .thenApply(__ -> {
                   final List<T> results = new ArrayList<>(futures.size());
                   for (CompletableFuture<T> future : futures) {
                       results.add(future.join());
                   }
                   return results;
               });
       }
   ```
   
   I am actually not sure that it would help here, see my previous comment.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -179,6 +181,13 @@ public GroupType type() {
     public String stateAsString() {
         return state.get().toString();
     }
+    /**

Review Comment:
   nit: Let's put an empty line before this one.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -747,4 +756,15 @@ private static Integer decValue(String key, Integer value) 
{
     private static Integer incValue(String key, Integer value) {
         return value == null ? 1 : value + 1;
     }
+
+    /**
+     * @return the group formatted as a list group response.
+     */
+    public ListGroupsResponseData.ListedGroup asListedGroup(long 
committedOffset) {
+        return new ListGroupsResponseData.ListedGroup()
+            .setGroupId(groupId)
+            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .setGroupState(state.get(committedOffset).toString());
+    }

Review Comment:
   nit: Could we move this method to before all the private method? For 
instance, we could put right after the new `stateAsString` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to