rajinisivaram commented on code in PR #10964:
URL: https://github.com/apache/kafka/pull/10964#discussion_r919717158


##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java:
##########
@@ -17,33 +17,72 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.Map;
-
 /**
- * The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
+ * The result of the {@link Admin#listConsumerGroupOffsets(Map)} and
+ * {@link Admin#listConsumerGroupOffsets(String)} call.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsResult {
 
-    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+    final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>>> futures;
 
-    ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>> future) {
-        this.future = future;
+    ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+        this.futures = futures;
     }
 
     /**
      * Return a future which yields a map of topic partitions to 
OffsetAndMetadata objects.
      * If the group does not have a committed offset for this partition, the 
corresponding value in the returned map will be null.
      */
     public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> 
partitionsToOffsetAndMetadata() {
-        return future;
+        if (futures.size() != 1) {
+            throw new IllegalStateException("Offsets from multiple consumer 
groups were requested. " +
+                    "Use groupIdsToPartitionsAndOffsetAndMetadata() instead to 
get all futures.");
+        }
+        return futures.values().iterator().next();
     }
 
+    /**
+     * Return a map of group ids to their corresponding futures that yield a 
map of topic partitions to
+     * OffsetAndMetadata objects. If the group doesn't have a committed offset 
for a specific
+     * partition, the corresponding value in the returned map for that group 
id will be null.
+     */
+    public Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> 
groupIdsToPartitionsAndOffsetAndMetadata() {

Review Comment:
   Added partitionsToOffsetAndMetadata(String) to be compatible with the 
existing method partitionsToOffsetAndMetadata().



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java:
##########
@@ -82,16 +69,30 @@ public AdminApiLookupStrategy<CoordinatorKey> 
lookupStrategy() {
     }
 
     private void validateKeys(Set<CoordinatorKey> groupIds) {
-        if (!groupIds.equals(Collections.singleton(groupId))) {
+        Set<CoordinatorKey> keys = 
coordinatorKeys(groupIdToTopicPartitions.keySet());
+        if (!keys.containsAll(groupIds)) {
             throw new IllegalArgumentException("Received unexpected group ids 
" + groupIds +
-                " (expected only " + Collections.singleton(groupId) + ")");
+                    " (expected one of " + keys + ")");
         }
     }
 
+    private static Set<CoordinatorKey> coordinatorKeys(Collection<String> 
groupIds) {
+        return groupIds.stream()
+           .map(CoordinatorKey::byGroupId)
+           .collect(Collectors.toSet());
+    }
+
     @Override
     public OffsetFetchRequest.Builder buildBatchedRequest(int coordinatorId, 
Set<CoordinatorKey> groupIds) {
         validateKeys(groupIds);
-        return new OffsetFetchRequest.Builder(groupId.idValue, requireStable, 
partitions, false);
+
+        // Create a map that only contains the consumer groups owned by the 
coordinator.
+        Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions 
= new HashMap<>(groupIds.size());
+        groupIds.forEach(g -> 
coordinatorGroupIdToTopicPartitions.put(g.idValue, 
groupIdToTopicPartitions.get(g.idValue)));
+
+        // Set the flag to false as for admin client request,
+        // we don't need to wait for any pending offset state to clear.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to