dajac commented on code in PR #10964:
URL: https://github.com/apache/kafka/pull/10964#discussion_r918715411


##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java:
##########
@@ -17,33 +17,72 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.Map;
-
 /**
- * The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
+ * The result of the {@link Admin#listConsumerGroupOffsets(Map)} and
+ * {@link Admin#listConsumerGroupOffsets(String)} call.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsResult {
 
-    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+    final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>>> futures;
 
-    ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>> future) {
-        this.future = future;
+    ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+        this.futures = futures;
     }
 
     /**
      * Return a future which yields a map of topic partitions to 
OffsetAndMetadata objects.
      * If the group does not have a committed offset for this partition, the 
corresponding value in the returned map will be null.
      */
     public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> 
partitionsToOffsetAndMetadata() {
-        return future;
+        if (futures.size() != 1) {
+            throw new IllegalStateException("Offsets from multiple consumer 
groups were requested. " +
+                    "Use groupIdsToPartitionsAndOffsetAndMetadata() instead to 
get all futures.");
+        }
+        return futures.values().iterator().next();
     }
 
+    /**
+     * Return a map of group ids to their corresponding futures that yield a 
map of topic partitions to
+     * OffsetAndMetadata objects. If the group doesn't have a committed offset 
for a specific
+     * partition, the corresponding value in the returned map for that group 
id will be null.
+     */
+    public Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> 
groupIdsToPartitionsAndOffsetAndMetadata() {

Review Comment:
   `groupIdsToPartitionsAndOffsetAndMetadata` looks a bit scary. I wonder if we 
should have something like `KafkaFuture<Map<TopicPartition, OffsetAndMetadata> 
offsets(String groupId)` instead. What do you think?



##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -915,23 +916,56 @@ default ListConsumerGroupsResult listConsumerGroups() {
 
     /**
      * List the consumer group offsets available in the cluster.
+     * <p>
+     * @deprecated Since 3.3.
+     * Use {@link #listConsumerGroupOffsets(Map, 
ListConsumerGroupOffsetsOptions)}.
      *
      * @param options The options to use when listing the consumer group 
offsets.
      * @return The ListGroupOffsetsResult
      */
-    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, 
ListConsumerGroupOffsetsOptions options);
+    @Deprecated
+    default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String 
groupId, ListConsumerGroupOffsetsOptions options) {
+        ListConsumerGroupOffsetsOptions listOptions = new 
ListConsumerGroupOffsetsOptions()
+            .requireStable(options.requireStable());
+        return listConsumerGroupOffsets(Collections.singletonMap(groupId, 
options.topicPartitions()), listOptions);
+    }
 
     /**
      * List the consumer group offsets available in the cluster with the 
default options.
      * <p>
-     * This is a convenience method for {@link 
#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with 
default options.
+     * This is a convenience method for {@link #listConsumerGroupOffsets(Map, 
ListConsumerGroupOffsetsOptions)}
+     * to list offsets of all partitions of one group with default options.
      *
      * @return The ListGroupOffsetsResult.
      */
     default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String 
groupId) {
         return listConsumerGroupOffsets(groupId, new 
ListConsumerGroupOffsetsOptions());
     }
 
+    /**
+     * List the consumer group offsets available in the cluster for the 
specified consumer groups.
+     *
+     * @param groupIdToTopicPartitions Map of consumer group ids to the topic 
partitions of the group to list offsets for.
+     *                                 If value is null, offsets are listed 
for all partitions.
+     * @param options The options to use when listing the consumer group 
offsets.
+     * @return The ListGroupOffsetsResult
+     */
+    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, 
List<TopicPartition>> groupIdToTopicPartitions, ListConsumerGroupOffsetsOptions 
options);

Review Comment:
   I wonder if using `Map<String, List<TopicPartition>> 
groupIdToTopicPartitions` is the right way here for two reasons:
   1) Using `null` to indicate that we want to get all the partitions does not 
seem very intuitive.
   2) It is harder to evolve it if we need to add new per group fields in the 
future.
   
   We could for instance replace `List<TopicPartition>` by 
`ListConsumerGroupSpec`. @rajinisivaram What's your take on this?



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java:
##########
@@ -82,16 +69,30 @@ public AdminApiLookupStrategy<CoordinatorKey> 
lookupStrategy() {
     }
 
     private void validateKeys(Set<CoordinatorKey> groupIds) {
-        if (!groupIds.equals(Collections.singleton(groupId))) {
+        Set<CoordinatorKey> keys = 
coordinatorKeys(groupIdToTopicPartitions.keySet());
+        if (!keys.containsAll(groupIds)) {
             throw new IllegalArgumentException("Received unexpected group ids 
" + groupIds +
-                " (expected only " + Collections.singleton(groupId) + ")");
+                    " (expected one of " + keys + ")");
         }
     }
 
+    private static Set<CoordinatorKey> coordinatorKeys(Collection<String> 
groupIds) {
+        return groupIds.stream()
+           .map(CoordinatorKey::byGroupId)
+           .collect(Collectors.toSet());
+    }
+
     @Override
     public OffsetFetchRequest.Builder buildBatchedRequest(int coordinatorId, 
Set<CoordinatorKey> groupIds) {
         validateKeys(groupIds);
-        return new OffsetFetchRequest.Builder(groupId.idValue, requireStable, 
partitions, false);
+
+        // Create a map that only contains the consumer groups owned by the 
coordinator.
+        Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions 
= new HashMap<>(groupIds.size());
+        groupIds.forEach(g -> 
coordinatorGroupIdToTopicPartitions.put(g.idValue, 
groupIdToTopicPartitions.get(g.idValue)));
+
+        // Set the flag to false as for admin client request,
+        // we don't need to wait for any pending offset state to clear.

Review Comment:
   Is this comment still relevant?



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java:
##########
@@ -17,33 +17,72 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.Map;
-
 /**
- * The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
+ * The result of the {@link Admin#listConsumerGroupOffsets(Map)} and
+ * {@link Admin#listConsumerGroupOffsets(String)} call.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsResult {
 
-    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+    final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>>> futures;
 
-    ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>> future) {
-        this.future = future;
+    ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+        this.futures = futures;
     }
 
     /**
      * Return a future which yields a map of topic partitions to 
OffsetAndMetadata objects.
      * If the group does not have a committed offset for this partition, the 
corresponding value in the returned map will be null.
      */
     public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> 
partitionsToOffsetAndMetadata() {

Review Comment:
   If we deprecate all the others, should we deprecate this one as well?



##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -915,23 +916,56 @@ default ListConsumerGroupsResult listConsumerGroups() {
 
     /**
      * List the consumer group offsets available in the cluster.
+     * <p>
+     * @deprecated Since 3.3.
+     * Use {@link #listConsumerGroupOffsets(Map, 
ListConsumerGroupOffsetsOptions)}.
      *
      * @param options The options to use when listing the consumer group 
offsets.
      * @return The ListGroupOffsetsResult
      */
-    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, 
ListConsumerGroupOffsetsOptions options);
+    @Deprecated

Review Comment:
   I just read the KIP and it was not clear wether we wanted to deprecate this 
method or not. Do we really want to deprecate it? Personally, I don't have a 
strong opinion on this but I find this method handy for a single group.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java:
##########
@@ -82,16 +69,30 @@ public AdminApiLookupStrategy<CoordinatorKey> 
lookupStrategy() {
     }
 
     private void validateKeys(Set<CoordinatorKey> groupIds) {
-        if (!groupIds.equals(Collections.singleton(groupId))) {
+        Set<CoordinatorKey> keys = 
coordinatorKeys(groupIdToTopicPartitions.keySet());
+        if (!keys.containsAll(groupIds)) {
             throw new IllegalArgumentException("Received unexpected group ids 
" + groupIds +
-                " (expected only " + Collections.singleton(groupId) + ")");
+                    " (expected one of " + keys + ")");
         }
     }
 
+    private static Set<CoordinatorKey> coordinatorKeys(Collection<String> 
groupIds) {
+        return groupIds.stream()
+           .map(CoordinatorKey::byGroupId)
+           .collect(Collectors.toSet());
+    }
+
     @Override
     public OffsetFetchRequest.Builder buildBatchedRequest(int coordinatorId, 
Set<CoordinatorKey> groupIds) {
         validateKeys(groupIds);
-        return new OffsetFetchRequest.Builder(groupId.idValue, requireStable, 
partitions, false);
+
+        // Create a map that only contains the consumer groups owned by the 
coordinator.
+        Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions 
= new HashMap<>(groupIds.size());
+        groupIds.forEach(g -> 
coordinatorGroupIdToTopicPartitions.put(g.idValue, 
groupIdToTopicPartitions.get(g.idValue)));
+
+        // Set the flag to false as for admin client request,
+        // we don't need to wait for any pending offset state to clear.
+        return new 
OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, requireStable, 
false);

Review Comment:
   How do we handle the case where the broker does not support the batch API? 
My understanding is that the request will fail with a 
`NoBatchedOffsetFetchRequestException` exception. 
`NoBatchedOffsetFetchRequestException` is not really part of our public API so 
I am not sure if we can do this.
   
   Generally, we have always aimed at having graceful failback mechanism in the 
admin api. Have we considered failing back to using independent pre-group 
requests if the new version of the API is not available?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to