rajinisivaram commented on code in PR #10964:
URL: https://github.com/apache/kafka/pull/10964#discussion_r918831524


##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java:
##########
@@ -82,16 +69,30 @@ public AdminApiLookupStrategy<CoordinatorKey> 
lookupStrategy() {
     }
 
     private void validateKeys(Set<CoordinatorKey> groupIds) {
-        if (!groupIds.equals(Collections.singleton(groupId))) {
+        Set<CoordinatorKey> keys = 
coordinatorKeys(groupIdToTopicPartitions.keySet());
+        if (!keys.containsAll(groupIds)) {
             throw new IllegalArgumentException("Received unexpected group ids 
" + groupIds +
-                " (expected only " + Collections.singleton(groupId) + ")");
+                    " (expected one of " + keys + ")");
         }
     }
 
+    private static Set<CoordinatorKey> coordinatorKeys(Collection<String> 
groupIds) {
+        return groupIds.stream()
+           .map(CoordinatorKey::byGroupId)
+           .collect(Collectors.toSet());
+    }
+
     @Override
     public OffsetFetchRequest.Builder buildBatchedRequest(int coordinatorId, 
Set<CoordinatorKey> groupIds) {
         validateKeys(groupIds);
-        return new OffsetFetchRequest.Builder(groupId.idValue, requireStable, 
partitions, false);
+
+        // Create a map that only contains the consumer groups owned by the 
coordinator.
+        Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions 
= new HashMap<>(groupIds.size());
+        groupIds.forEach(g -> 
coordinatorGroupIdToTopicPartitions.put(g.idValue, 
groupIdToTopicPartitions.get(g.idValue)));
+
+        // Set the flag to false as for admin client request,
+        // we don't need to wait for any pending offset state to clear.
+        return new 
OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, requireStable, 
false);

Review Comment:
   Oh sorry, missed this. Yes, we should use per-group requests when batching 
is unavailable. Will update PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to