This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 66000787c1 KAFKA-13043: Implement Admin APIs for offsetFetch batching 
(#10964)
66000787c1 is described below

commit 66000787c1146c6d08d88b9c564f5a000608f013
Author: Sanjana Kaundinya <skaundi...@gmail.com>
AuthorDate: Thu Jul 14 05:47:34 2022 -0700

    KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)
    
    This implements the AdminAPI portion of KIP-709: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258. The 
request/response protocol changes were implemented in 3.0.0. A new batched API 
has been introduced to list consumer offsets for different groups. For brokers 
older than 3.0.0, separate requests are sent for each group.
    
    Co-authored-by: Rajini Sivaram <rajinisiva...@googlemail.com>
    Co-authored-by: David Jacot <dja...@confluent.io>
    
    Reviewers: David Jacot <dja...@confluent.io>,  Rajini Sivaram 
<rajinisiva...@googlemail.com>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  36 ++-
 .../kafka/clients/admin/KafkaAdminClient.java      |  11 +-
 .../admin/ListConsumerGroupOffsetsOptions.java     |  14 +-
 .../admin/ListConsumerGroupOffsetsResult.java      |  56 +++-
 .../admin/ListConsumerGroupOffsetsSpec.java        |  79 ++++++
 .../clients/admin/internals/AdminApiDriver.java    |   3 +-
 .../admin/internals/CoordinatorStrategy.java       |   4 +
 .../internals/ListConsumerGroupOffsetsHandler.java | 128 +++++----
 .../kafka/common/requests/OffsetFetchResponse.java |  10 +-
 .../kafka/clients/admin/AdminClientTestUtils.java  |  12 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 206 ++++++++++++--
 .../kafka/clients/admin/MockAdminClient.java       |  16 +-
 .../ListConsumerGroupOffsetsHandlerTest.java       | 308 +++++++++++++++++++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   6 +-
 .../internals/ConsumerCoordinatorTest.java         |  26 +-
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |   8 +-
 .../kafka/admin/ConsumerGroupServiceTest.scala     |  22 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 .../processor/internals/StoreChangelogReader.java  |  12 +-
 .../internals/StoreChangelogReaderTest.java        |  11 +-
 20 files changed, 813 insertions(+), 157 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index fdacc09db8..0698d29702 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
 
 import java.time.Duration;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -919,12 +920,20 @@ public interface Admin extends AutoCloseable {
      * @param options The options to use when listing the consumer group 
offsets.
      * @return The ListGroupOffsetsResult
      */
-    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, 
ListConsumerGroupOffsetsOptions options);
+    default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String 
groupId, ListConsumerGroupOffsetsOptions options) {
+        ListConsumerGroupOffsetsOptions listOptions = new 
ListConsumerGroupOffsetsOptions()
+            .requireStable(options.requireStable());
+        @SuppressWarnings("deprecation")
+        ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
+            .topicPartitions(options.topicPartitions());
+        return listConsumerGroupOffsets(Collections.singletonMap(groupId, 
groupSpec), listOptions);
+    }
 
     /**
      * List the consumer group offsets available in the cluster with the 
default options.
      * <p>
-     * This is a convenience method for {@link 
#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with 
default options.
+     * This is a convenience method for {@link #listConsumerGroupOffsets(Map, 
ListConsumerGroupOffsetsOptions)}
+     * to list offsets of all partitions of one group with default options.
      *
      * @return The ListGroupOffsetsResult.
      */
@@ -932,6 +941,29 @@ public interface Admin extends AutoCloseable {
         return listConsumerGroupOffsets(groupId, new 
ListConsumerGroupOffsetsOptions());
     }
 
+    /**
+     * List the consumer group offsets available in the cluster for the 
specified consumer groups.
+     *
+     * @param groupSpecs Map of consumer group ids to a spec that specifies 
the topic partitions of the group to list offsets for.
+     *
+     * @param options The options to use when listing the consumer group 
offsets.
+     * @return The ListConsumerGroupOffsetsResult
+     */
+    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, 
ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions 
options);
+
+    /**
+     * List the consumer group offsets available in the cluster for the 
specified groups with the default options.
+     * <p>
+     * This is a convenience method for
+     * {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)} 
with default options.
+     *
+     * @param groupSpecs Map of consumer group ids to a spec that specifies 
the topic partitions of the group to list offsets for.
+     * @return The ListConsumerGroupOffsetsResult.
+     */
+    default ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs) {
+        return listConsumerGroupOffsets(groupSpecs, new 
ListConsumerGroupOffsetsOptions());
+    }
+
     /**
      * Delete consumer groups from the cluster.
      *
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 2b2642e351..41eb27a1dd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3401,13 +3401,14 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final 
String groupId,
-                                                                   final 
ListConsumerGroupOffsetsOptions options) {
+    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, 
ListConsumerGroupOffsetsSpec> groupSpecs,
+                                                                   
ListConsumerGroupOffsetsOptions options) {
         SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> future =
-                ListConsumerGroupOffsetsHandler.newFuture(groupId);
-        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), 
options.requireStable(), logContext);
+                ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet());
+        ListConsumerGroupOffsetsHandler handler =
+            new ListConsumerGroupOffsetsHandler(groupSpecs, 
options.requireStable(), logContext);
         invokeDriver(handler, future, options.timeoutMs);
-        return new 
ListConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
+        return new ListConsumerGroupOffsetsResult(future.all());
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
index 292a47ef39..44d3a40732 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
@@ -23,23 +23,28 @@ import 
org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.List;
 
 /**
- * Options for {@link Admin#listConsumerGroupOffsets(String)}.
+ * Options for {@link Admin#listConsumerGroupOffsets(java.util.Map)} and 
{@link Admin#listConsumerGroupOffsets(String)}.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsOptions extends 
AbstractOptions<ListConsumerGroupOffsetsOptions> {
 
-    private List<TopicPartition> topicPartitions = null;
+    private List<TopicPartition> topicPartitions;
     private boolean requireStable = false;
 
     /**
      * Set the topic partitions to list as part of the result.
      * {@code null} includes all topic partitions.
+     * <p>
+     * @deprecated Since 3.3.
+     * Use {@link Admin#listConsumerGroupOffsets(java.util.Map, 
ListConsumerGroupOffsetsOptions)}
+     * to specify topic partitions.
      *
      * @param topicPartitions List of topic partitions to include
      * @return This ListGroupOffsetsOptions
      */
+    @Deprecated
     public ListConsumerGroupOffsetsOptions 
topicPartitions(List<TopicPartition> topicPartitions) {
         this.topicPartitions = topicPartitions;
         return this;
@@ -55,7 +60,12 @@ public class ListConsumerGroupOffsetsOptions extends 
AbstractOptions<ListConsume
 
     /**
      * Returns a list of topic partitions to add as part of the result.
+     * <p>
+     * @deprecated Since 3.3.
+     * Use {@link Admin#listConsumerGroupOffsets(java.util.Map, 
ListConsumerGroupOffsetsOptions)}
+     * to specify topic partitions.
      */
+    @Deprecated
     public List<TopicPartition> topicPartitions() {
         return topicPartitions;
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
index 48f4531418..2136e33a40 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
@@ -17,25 +17,32 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.Map;
-
 /**
- * The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
+ * The result of the {@link Admin#listConsumerGroupOffsets(Map)} and
+ * {@link Admin#listConsumerGroupOffsets(String)} call.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsResult {
 
-    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+    final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures;
 
-    ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>> future) {
-        this.future = future;
+    ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+        this.futures = futures.entrySet().stream()
+                .collect(Collectors.toMap(e -> e.getKey().idValue, 
Entry::getValue));
     }
 
     /**
@@ -43,7 +50,42 @@ public class ListConsumerGroupOffsetsResult {
      * If the group does not have a committed offset for this partition, the 
corresponding value in the returned map will be null.
      */
     public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> 
partitionsToOffsetAndMetadata() {
-        return future;
+        if (futures.size() != 1) {
+            throw new IllegalStateException("Offsets from multiple consumer 
groups were requested. " +
+                    "Use partitionsToOffsetAndMetadata(groupId) instead to get 
future for a specific group.");
+        }
+        return futures.values().iterator().next();
     }
 
+    /**
+     * Return a future which yields a map of topic partitions to 
OffsetAndMetadata objects for
+     * the specified group. If the group doesn't have a committed offset for a 
specific
+     * partition, the corresponding value in the returned map will be null.
+     */
+    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> 
partitionsToOffsetAndMetadata(String groupId) {
+        if (!futures.containsKey(groupId))
+            throw new IllegalArgumentException("Offsets for consumer group '" 
+ groupId + "' were not requested.");
+        return futures.get(groupId);
+    }
+
+    /**
+     * Return a future which yields all Map<String, Map<TopicPartition, 
OffsetAndMetadata> objects,
+     * if requests for all the groups succeed.
+     */
+    public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> 
all() {
+        return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture[0])).thenApply(
+            nil -> {
+                Map<String, Map<TopicPartition, OffsetAndMetadata>> 
listedConsumerGroupOffsets = new HashMap<>(futures.size());
+                futures.forEach((key, future) -> {
+                    try {
+                        listedConsumerGroupOffsets.put(key, future.get());
+                    } catch (InterruptedException | ExecutionException e) {
+                        // This should be unreachable, since the 
KafkaFuture#allOf already ensured
+                        // that all of the futures completed successfully.
+                        throw new RuntimeException(e);
+                    }
+                });
+                return listedConsumerGroupOffsets;
+            });
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java
new file mode 100644
index 0000000000..83858e49c8
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Specification of consumer group offsets to list using {@link 
Admin#listConsumerGroupOffsets(java.util.Map)}.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListConsumerGroupOffsetsSpec {
+
+    private Collection<TopicPartition> topicPartitions;
+
+    /**
+     * Set the topic partitions whose offsets are to be listed for a consumer 
group.
+     * {@code null} includes all topic partitions.
+     *
+     * @param topicPartitions List of topic partitions to include
+     * @return This ListConsumerGroupOffsetSpec
+     */
+    public ListConsumerGroupOffsetsSpec 
topicPartitions(Collection<TopicPartition> topicPartitions) {
+        this.topicPartitions = topicPartitions;
+        return this;
+    }
+
+    /**
+     * Returns the topic partitions whose offsets are to be listed for a 
consumer group.
+     * {@code null} indicates that offsets of all partitions of the group are 
to be listed.
+     */
+    public Collection<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ListConsumerGroupOffsetsSpec)) {
+            return false;
+        }
+        ListConsumerGroupOffsetsSpec that = (ListConsumerGroupOffsetsSpec) o;
+        return Objects.equals(topicPartitions, that.topicPartitions);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topicPartitions);
+    }
+
+    @Override
+    public String toString() {
+        return "ListConsumerGroupOffsetsSpec(" +
+                "topicPartitions=" + topicPartitions +
+                ')';
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
index d00db4b18c..0e1b03d964 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import 
org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException;
+import 
org.apache.kafka.common.requests.OffsetFetchRequest.NoBatchedOffsetFetchRequestException;
 import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
@@ -253,7 +254,7 @@ public class AdminApiDriver<K, V> {
                 .collect(Collectors.toSet());
             retryLookup(keysToUnmap);
 
-        } else if (t instanceof NoBatchedFindCoordinatorsException) {
+        } else if (t instanceof NoBatchedFindCoordinatorsException || t 
instanceof NoBatchedOffsetFetchRequestException) {
             ((CoordinatorStrategy) handler.lookupStrategy()).disableBatch();
             Set<K> keysToUnmap = spec.keys.stream()
                 .filter(future.lookupKeys()::contains)
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
index e6fc0d624a..02b68527c3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
@@ -120,6 +120,10 @@ public class CoordinatorStrategy implements 
AdminApiLookupStrategy<CoordinatorKe
         batch = false;
     }
 
+    public boolean batch() {
+        return batch;
+    }
+
     private CoordinatorKey requireSingletonAndType(Set<CoordinatorKey> keys) {
         if (keys.size() != 1) {
             throw new IllegalArgumentException("Unexpected size of key set: 
expected 1, but got " + keys.size());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
index 08648821f7..21c7d8d488 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
@@ -17,14 +17,16 @@
 package org.apache.kafka.clients.admin.internals;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -36,39 +38,26 @@ import 
org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
-public class ListConsumerGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> 
{
+public class ListConsumerGroupOffsetsHandler implements 
AdminApiHandler<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> {
 
-    private final CoordinatorKey groupId;
-    private final List<TopicPartition> partitions;
     private final boolean requireStable;
+    private final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs;
     private final Logger log;
-    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+    private final CoordinatorStrategy lookupStrategy;
 
     public ListConsumerGroupOffsetsHandler(
-        String groupId,
-        List<TopicPartition> partitions,
-        LogContext logContext
-    ) {
-        this(groupId, partitions, false, logContext);
-    }
-
-    public ListConsumerGroupOffsetsHandler(
-        String groupId,
-        List<TopicPartition> partitions,
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs,
         boolean requireStable,
         LogContext logContext
     ) {
-        this.groupId = CoordinatorKey.byGroupId(groupId);
-        this.partitions = partitions;
-        this.requireStable = requireStable;
         this.log = logContext.logger(ListConsumerGroupOffsetsHandler.class);
         this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+        this.groupSpecs = groupSpecs;
+        this.requireStable = requireStable;
     }
 
-    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, OffsetAndMetadata>> newFuture(
-        String groupId
-    ) {
-        return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, OffsetAndMetadata>> newFuture(Collection<String> groupIds) {
+        return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
     }
 
     @Override
@@ -82,16 +71,45 @@ public class ListConsumerGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coo
     }
 
     private void validateKeys(Set<CoordinatorKey> groupIds) {
-        if (!groupIds.equals(Collections.singleton(groupId))) {
+        Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+        if (!keys.containsAll(groupIds)) {
             throw new IllegalArgumentException("Received unexpected group ids 
" + groupIds +
-                " (expected only " + Collections.singleton(groupId) + ")");
+                    " (expected one of " + keys + ")");
         }
     }
 
+    private static Set<CoordinatorKey> coordinatorKeys(Collection<String> 
groupIds) {
+        return groupIds.stream()
+           .map(CoordinatorKey::byGroupId)
+           .collect(Collectors.toSet());
+    }
+
+    public OffsetFetchRequest.Builder buildBatchedRequest(Set<CoordinatorKey> 
groupIds) {
+        // Create a map that only contains the consumer groups owned by the 
coordinator.
+        Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions 
= new HashMap<>(groupIds.size());
+        groupIds.forEach(g -> {
+            ListConsumerGroupOffsetsSpec spec = groupSpecs.get(g.idValue);
+            List<TopicPartition> partitions = spec.topicPartitions() != null ? 
new ArrayList<>(spec.topicPartitions()) : null;
+            coordinatorGroupIdToTopicPartitions.put(g.idValue, partitions);
+        });
+
+        return new 
OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, requireStable, 
false);
+    }
+
     @Override
-    public OffsetFetchRequest.Builder buildBatchedRequest(int coordinatorId, 
Set<CoordinatorKey> groupIds) {
+    public Collection<RequestAndKeys<CoordinatorKey>> buildRequest(int 
brokerId, Set<CoordinatorKey> groupIds) {
         validateKeys(groupIds);
-        return new OffsetFetchRequest.Builder(groupId.idValue, requireStable, 
partitions, false);
+
+        // When the OffsetFetchRequest fails with 
NoBatchedOffsetFetchRequestException, we completely disable
+        // the batching end-to-end, including the FindCoordinatorRequest.
+        if (lookupStrategy.batch()) {
+            return Collections.singletonList(new 
RequestAndKeys<>(buildBatchedRequest(groupIds), groupIds));
+        } else {
+            return groupIds.stream().map(groupId -> {
+                Set<CoordinatorKey> keys = Collections.singleton(groupId);
+                return new RequestAndKeys<>(buildBatchedRequest(keys), keys);
+            }).collect(Collectors.toList());
+        }
     }
 
     @Override
@@ -104,44 +122,46 @@ public class ListConsumerGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coo
 
         final OffsetFetchResponse response = (OffsetFetchResponse) 
abstractResponse;
 
-        // the groupError will contain the group level error for v0-v8 
OffsetFetchResponse
-        Errors groupError = response.groupLevelError(groupId.idValue);
-        if (groupError != Errors.NONE) {
-            final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-            final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
-
-            handleGroupError(groupId, groupError, failed, groupsToUnmap);
-
-            return new ApiResult<>(Collections.emptyMap(), failed, new 
ArrayList<>(groupsToUnmap));
-        } else {
-            final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = 
new HashMap<>();
-
-            
response.partitionDataMap(groupId.idValue).forEach((topicPartition, 
partitionData) -> {
-                final Errors error = partitionData.error;
-                if (error == Errors.NONE) {
-                    final long offset = partitionData.offset;
-                    final String metadata = partitionData.metadata;
-                    final Optional<Integer> leaderEpoch = 
partitionData.leaderEpoch;
-                    // Negative offset indicates that the group has no 
committed offset for this partition
-                    if (offset < 0) {
-                        groupOffsetsListing.put(topicPartition, null);
+        Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed 
= new HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+        for (CoordinatorKey coordinatorKey : groupIds) {
+            String group = coordinatorKey.idValue;
+            if (response.groupHasError(group)) {
+                handleGroupError(CoordinatorKey.byGroupId(group), 
response.groupLevelError(group), failed, unmapped);
+            } else {
+                final Map<TopicPartition, OffsetAndMetadata> 
groupOffsetsListing = new HashMap<>();
+                Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData = response.partitionDataMap(group);
+                for (Map.Entry<TopicPartition, 
OffsetFetchResponse.PartitionData> partitionEntry : responseData.entrySet()) {
+                    final TopicPartition topicPartition = 
partitionEntry.getKey();
+                    OffsetFetchResponse.PartitionData partitionData = 
partitionEntry.getValue();
+                    final Errors error = partitionData.error;
+
+                    if (error == Errors.NONE) {
+                        final long offset = partitionData.offset;
+                        final String metadata = partitionData.metadata;
+                        final Optional<Integer> leaderEpoch = 
partitionData.leaderEpoch;
+                        // Negative offset indicates that the group has no 
committed offset for this partition
+                        if (offset < 0) {
+                            groupOffsetsListing.put(topicPartition, null);
+                        } else {
+                            groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, leaderEpoch, metadata));
+                        }
                     } else {
-                        groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, leaderEpoch, metadata));
+                        log.warn("Skipping return offset for {} due to error 
{}.", topicPartition, error);
                     }
-                } else {
-                    log.warn("Skipping return offset for {} due to error {}.", 
topicPartition, error);
                 }
-            });
-
-            return ApiResult.completed(groupId, groupOffsetsListing);
+                completed.put(CoordinatorKey.byGroupId(group), 
groupOffsetsListing);
+            }
         }
+        return new ApiResult<>(completed, failed, unmapped);
     }
 
     private void handleGroupError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        Set<CoordinatorKey> groupsToUnmap
+        List<CoordinatorKey> groupsToUnmap
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 213182ec8c..4e25984668 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -173,8 +173,8 @@ public class OffsetFetchResponse extends AbstractResponse {
      * @param responseData Fetched offset information grouped by 
topic-partition and by group
      */
     public OffsetFetchResponse(int throttleTimeMs,
-                               Map<String, Errors> errors, Map<String,
-                               Map<TopicPartition, PartitionData>> 
responseData) {
+                               Map<String, Errors> errors,
+                               Map<String, Map<TopicPartition, PartitionData>> 
responseData) {
         super(ApiKeys.OFFSET_FETCH);
         List<OffsetFetchResponseGroup> groupList = new ArrayList<>();
         for (Entry<String, Map<TopicPartition, PartitionData>> entry : 
responseData.entrySet()) {
@@ -250,7 +250,11 @@ public class OffsetFetchResponse extends AbstractResponse {
     }
 
     public boolean groupHasError(String groupId) {
-        return groupLevelErrors.get(groupId) != Errors.NONE;
+        Errors error = groupLevelErrors.get(groupId);
+        if (error == null) {
+            return this.error != null && this.error != Errors.NONE;
+        }
+        return error != Errors.NONE;
     }
 
     public Errors error() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 6f98a166b1..d8b9f427d6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
 import org.apache.kafka.clients.HostResolver;
 import 
org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
 import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
@@ -104,14 +105,17 @@ public class AdminClientTestUtils {
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> 
KafkaFuture.completedFuture(e.getValue()))));
     }
 
-    public static ListConsumerGroupOffsetsResult 
listConsumerGroupOffsetsResult(Map<TopicPartition, OffsetAndMetadata> offsets) {
-        return new 
ListConsumerGroupOffsetsResult(KafkaFuture.completedFuture(offsets));
+    public static ListConsumerGroupOffsetsResult 
listConsumerGroupOffsetsResult(Map<String, Map<TopicPartition, 
OffsetAndMetadata>> offsets) {
+        Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>>> resultMap = offsets.entrySet().stream()
+            .collect(Collectors.toMap(e -> 
CoordinatorKey.byGroupId(e.getKey()),
+                                      e -> 
KafkaFutureImpl.completedFuture(e.getValue())));
+        return new ListConsumerGroupOffsetsResult(resultMap);
     }
 
-    public static ListConsumerGroupOffsetsResult 
listConsumerGroupOffsetsResult(KafkaException exception) {
+    public static ListConsumerGroupOffsetsResult 
listConsumerGroupOffsetsResult(String group, KafkaException exception) {
         final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> future = 
new KafkaFutureImpl<>();
         future.completeExceptionally(exception);
-        return new ListConsumerGroupOffsetsResult(future);
+        return new 
ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group),
 future));
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 61a2aaa00b..3d285a45f7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -108,6 +108,7 @@ import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
 import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo;
 import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
 import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
@@ -192,6 +193,7 @@ import 
org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetDeleteResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
 import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.UnregisterBrokerResponse;
 import org.apache.kafka.common.requests.UpdateFeaturesRequest;
@@ -224,6 +226,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
@@ -266,6 +269,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class KafkaAdminClientTest {
     private static final Logger log = 
LoggerFactory.getLogger(KafkaAdminClientTest.class);
     private static final String GROUP_ID = "group-0";
+    private static final int THROTTLE = 10;
 
     @Test
     public void testDefaultApiTimeoutAndRequestTimeoutConflicts() {
@@ -501,6 +505,21 @@ public class KafkaAdminClientTest {
         return FindCoordinatorResponse.prepareOldResponse(error, node);
     }
 
+    private static FindCoordinatorResponse 
prepareBatchedFindCoordinatorResponse(Errors error, Node node, 
Collection<String> groups) {
+        FindCoordinatorResponseData data = new FindCoordinatorResponseData();
+        List<FindCoordinatorResponseData.Coordinator> coordinators = 
groups.stream()
+                .map(group -> new FindCoordinatorResponseData.Coordinator()
+                        .setErrorCode(error.code())
+                        .setErrorMessage(error.message())
+                        .setKey(group)
+                        .setHost(node.host())
+                        .setPort(node.port())
+                        .setNodeId(node.id()))
+                .collect(Collectors.toList());
+        data.setCoordinators(coordinators);
+        return new FindCoordinatorResponse(data);
+    }
+
     private static MetadataResponse prepareMetadataResponse(Cluster cluster, 
Errors error) {
         List<MetadataResponseTopic> metadata = new ArrayList<>();
         for (String topic : cluster.topics()) {
@@ -3067,9 +3086,11 @@ public class KafkaAdminClientTest {
             
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
             final TopicPartition tp1 = new TopicPartition("A", 0);
-            final ListConsumerGroupOffsetsOptions options = new 
ListConsumerGroupOffsetsOptions();
-            
options.topicPartitions(Collections.singletonList(tp1)).requireStable(true);
-            final ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets(GROUP_ID, options);
+            final ListConsumerGroupOffsetsOptions options = new 
ListConsumerGroupOffsetsOptions()
+                    .requireStable(true);
+            final ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
+                    .topicPartitions(Collections.singletonList(tp1));
+            
env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, 
groupSpec), options);
 
             final MockClient mockClient = env.kafkaClient();
             TestUtils.waitForCondition(() -> {
@@ -3077,11 +3098,11 @@ public class KafkaAdminClientTest {
                 if (clientRequest != null) {
                     OffsetFetchRequestData data = 
((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data;
                     return data.requireStable() &&
-                        data.topics().get(0).name().equals("A") &&
-                        
data.topics().get(0).partitionIndexes().equals(Collections.singletonList(0));
+                        
data.groups().get(0).topics().get(0).name().equals("A") &&
+                        
data.groups().get(0).topics().get(0).partitionIndexes().equals(Collections.singletonList(0));
                 }
                 return false;
-            }, "Failed awaiting ListConsumerGroupOffsets request");
+            }, "Failed awaiting ListConsumerGroupOfsets request");
         }
     }
 
@@ -3095,12 +3116,11 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
-            env.kafkaClient().prepareResponse(new 
OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+            
env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, 
Collections.emptyMap()));
             
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
             final ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets(GROUP_ID);
 
-
             
TestUtils.assertFutureError(result.partitionsToOffsetAndMetadata(), 
TimeoutException.class);
         }
     }
@@ -3124,16 +3144,16 @@ public class KafkaAdminClientTest {
             mockClient.prepareResponse(body -> {
                 firstAttemptTime.set(time.milliseconds());
                 return true;
-            }, new OffsetFetchResponse(Errors.NOT_COORDINATOR, 
Collections.emptyMap()));
+            }, offsetFetchResponse(Errors.NOT_COORDINATOR, 
Collections.emptyMap()));
 
             
mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
             mockClient.prepareResponse(body -> {
                 secondAttemptTime.set(time.milliseconds());
                 return true;
-            }, new OffsetFetchResponse(Errors.NONE, Collections.emptyMap()));
+            }, offsetFetchResponse(Errors.NONE, Collections.emptyMap()));
 
-            final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
env.adminClient().listConsumerGroupOffsets("group-0").partitionsToOffsetAndMetadata();
+            final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
env.adminClient().listConsumerGroupOffsets(GROUP_ID).partitionsToOffsetAndMetadata();
 
             TestUtils.waitForCondition(() -> mockClient.numAwaitingResponses() 
== 1, "Failed awaiting ListConsumerGroupOffsets first request failure");
             TestUtils.waitForCondition(() -> ((KafkaAdminClient) 
env.adminClient()).numPendingCalls() == 1, "Failed to add retry 
ListConsumerGroupOffsets call on first failure");
@@ -3157,7 +3177,8 @@ public class KafkaAdminClientTest {
                 prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
             env.kafkaClient().prepareResponse(
-                new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, 
Collections.emptyMap()));
+                offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, 
Collections.emptyMap()));
+
             /*
              * We need to return two responses here, one for NOT_COORDINATOR 
call when calling list consumer offsets
              * api using coordinator that has moved. This will retry whole 
operation. So we need to again respond with a
@@ -3166,19 +3187,19 @@ public class KafkaAdminClientTest {
              * And the same reason for the following COORDINATOR_NOT_AVAILABLE 
error response
              */
             env.kafkaClient().prepareResponse(
-                new OffsetFetchResponse(Errors.NOT_COORDINATOR, 
Collections.emptyMap()));
+                offsetFetchResponse(Errors.NOT_COORDINATOR, 
Collections.emptyMap()));
 
             env.kafkaClient().prepareResponse(
                 prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
             env.kafkaClient().prepareResponse(
-                new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Collections.emptyMap()));
+                offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Collections.emptyMap()));
 
             env.kafkaClient().prepareResponse(
                 prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
             env.kafkaClient().prepareResponse(
-                new OffsetFetchResponse(Errors.NONE, Collections.emptyMap()));
+                offsetFetchResponse(Errors.NONE, Collections.emptyMap()));
 
             final ListConsumerGroupOffsetsResult errorResult1 = 
env.adminClient().listConsumerGroupOffsets(GROUP_ID);
 
@@ -3199,8 +3220,7 @@ public class KafkaAdminClientTest {
                 env.kafkaClient().prepareResponse(
                     prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
-                env.kafkaClient().prepareResponse(
-                    new OffsetFetchResponse(error, Collections.emptyMap()));
+                env.kafkaClient().prepareResponse(offsetFetchResponse(error, 
Collections.emptyMap()));
 
                 ListConsumerGroupOffsetsResult errorResult = 
env.adminClient().listConsumerGroupOffsets(GROUP_ID);
 
@@ -3220,7 +3240,7 @@ public class KafkaAdminClientTest {
             
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
             // Retriable errors should be retried
-            env.kafkaClient().prepareResponse(new 
OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, 
Collections.emptyMap()));
+            
env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
 Collections.emptyMap()));
 
             /*
              * We need to return two responses here, one for NOT_COORDINATOR 
error when calling list consumer group offsets
@@ -3229,10 +3249,10 @@ public class KafkaAdminClientTest {
              *
              * And the same reason for the following COORDINATOR_NOT_AVAILABLE 
error response
              */
-            env.kafkaClient().prepareResponse(new 
OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+            
env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, 
Collections.emptyMap()));
             
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
-            env.kafkaClient().prepareResponse(new 
OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
+            
env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE,
 Collections.emptyMap()));
             
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
             TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 
0);
@@ -3249,7 +3269,7 @@ public class KafkaAdminClientTest {
                     Optional.empty(), "", Errors.NONE));
             responseData.put(myTopicPartition3, new 
OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE));
-            env.kafkaClient().prepareResponse(new 
OffsetFetchResponse(Errors.NONE, responseData));
+            env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NONE, 
responseData));
 
             final ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets(GROUP_ID);
             final Map<TopicPartition, OffsetAndMetadata> 
partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata().get();
@@ -3263,6 +3283,144 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testBatchedListConsumerGroupOffsets() throws Exception {
+        Cluster cluster = mockCluster(1, 0);
+        Time time = new MockTime();
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = 
batchedListConsumerGroupOffsetsSpec();
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster, AdminClientConfig.RETRIES_CONFIG, "0")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            
env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE,
 env.cluster().controller(), groupSpecs.keySet()));
+
+            ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets(groupSpecs, new 
ListConsumerGroupOffsetsOptions());
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true, 
Errors.NONE);
+
+            verifyListOffsetsForMultipleGroups(groupSpecs, result);
+        }
+    }
+
+    @Test
+    public void 
testBatchedListConsumerGroupOffsetsWithNoFindCoordinatorBatching() throws 
Exception {
+        Cluster cluster = mockCluster(1, 0);
+        Time time = new MockTime();
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = 
batchedListConsumerGroupOffsetsSpec();
+
+        ApiVersion findCoordinatorV3 = new ApiVersion()
+                .setApiKey(ApiKeys.FIND_COORDINATOR.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 3);
+        ApiVersion offsetFetchV7 = new ApiVersion()
+                .setApiKey(ApiKeys.OFFSET_FETCH.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 7);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster, AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3,
 offsetFetchV7)));
+            
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
 Node.noNode()));
+            
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE,
 env.cluster().controller()));
+            
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE,
 env.cluster().controller()));
+
+            ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets(groupSpecs);
+
+            // Fail the first request in order to ensure that the group is not 
batched when retried.
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, 
Errors.COORDINATOR_LOAD_IN_PROGRESS);
+
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, 
Errors.NONE);
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, 
Errors.NONE);
+
+            verifyListOffsetsForMultipleGroups(groupSpecs, result);
+        }
+    }
+
+    @Test
+    public void testBatchedListConsumerGroupOffsetsWithNoOffsetFetchBatching() 
throws Exception {
+        Cluster cluster = mockCluster(1, 0);
+        Time time = new MockTime();
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = 
batchedListConsumerGroupOffsetsSpec();
+
+        ApiVersion offsetFetchV7 = new ApiVersion()
+                .setApiKey(ApiKeys.OFFSET_FETCH.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 7);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster, AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singleton(offsetFetchV7)));
+            
env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE,
 env.cluster().controller(), groupSpecs.keySet()));
+            // Prepare a response to force client to attempt batched request 
creation that throws
+            // NoBatchedOffsetFetchRequestException. This triggers creation of 
non-batched requests.
+            
env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE,
 Collections.emptyMap()));
+
+            ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets(groupSpecs);
+
+            // The request handler attempts both FindCoordinator and 
OffsetFetch requests. This seems
+            // ok since since we expect this scenario only during upgrades 
from versions < 3.0.0 where
+            // some upgraded brokers could handle batched FindCoordinator 
while non-upgraded coordinators
+            // rejected batched OffsetFetch requests.
+            sendFindCoordinatorResponse(env.kafkaClient(), 
env.cluster().controller());
+            sendFindCoordinatorResponse(env.kafkaClient(), 
env.cluster().controller());
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, 
Errors.NONE);
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false, 
Errors.NONE);
+
+            verifyListOffsetsForMultipleGroups(groupSpecs, result);
+        }
+    }
+
+    private Map<String, ListConsumerGroupOffsetsSpec> 
batchedListConsumerGroupOffsetsSpec() {
+        Set<TopicPartition> groupAPartitions = Collections.singleton(new 
TopicPartition("A", 1));
+        Set<TopicPartition> groupBPartitions =  Collections.singleton(new 
TopicPartition("B", 2));
+
+        ListConsumerGroupOffsetsSpec groupASpec = new 
ListConsumerGroupOffsetsSpec().topicPartitions(groupAPartitions);
+        ListConsumerGroupOffsetsSpec groupBSpec = new 
ListConsumerGroupOffsetsSpec().topicPartitions(groupBPartitions);
+        return Utils.mkMap(Utils.mkEntry("groupA", groupASpec), 
Utils.mkEntry("groupB", groupBSpec));
+    }
+
+    private void waitForRequest(MockClient mockClient, ApiKeys apiKeys) throws 
Exception {
+        TestUtils.waitForCondition(() -> {
+            ClientRequest clientRequest = mockClient.requests().peek();
+            return clientRequest != null && clientRequest.apiKey() == apiKeys;
+        }, "Failed awaiting " + apiKeys + " request");
+    }
+
+    private void sendFindCoordinatorResponse(MockClient mockClient, Node 
coordinator) throws Exception {
+        waitForRequest(mockClient, ApiKeys.FIND_COORDINATOR);
+
+        ClientRequest clientRequest = mockClient.requests().peek();
+        FindCoordinatorRequestData data = ((FindCoordinatorRequest.Builder) 
clientRequest.requestBuilder()).data();
+        mockClient.respond(prepareFindCoordinatorResponse(Errors.NONE, 
data.key(), coordinator));
+    }
+
+    private void sendOffsetFetchResponse(MockClient mockClient, Map<String, 
ListConsumerGroupOffsetsSpec> groupSpecs, boolean batched, Errors error) throws 
Exception {
+        waitForRequest(mockClient, ApiKeys.OFFSET_FETCH);
+
+        ClientRequest clientRequest = mockClient.requests().peek();
+        OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) 
clientRequest.requestBuilder()).data;
+        Map<String, Map<TopicPartition, PartitionData>> results = new 
HashMap<>();
+        Map<String, Errors> errors = new HashMap<>();
+        data.groups().forEach(group -> {
+            Map<TopicPartition, PartitionData> partitionResults = new 
HashMap<>();
+            for (TopicPartition tp : 
groupSpecs.get(group.groupId()).topicPartitions()) {
+                partitionResults.put(tp, new PartitionData(10, 
Optional.empty(), "", Errors.NONE));
+            }
+            results.put(group.groupId(), partitionResults);
+            errors.put(group.groupId(), error);
+        });
+        if (!batched) {
+            assertEquals(1, data.groups().size());
+            mockClient.respond(new OffsetFetchResponse(THROTTLE, error, 
results.values().iterator().next()));
+        } else
+            mockClient.respond(new OffsetFetchResponse(THROTTLE, errors, 
results));
+    }
+
+    private void verifyListOffsetsForMultipleGroups(Map<String, 
ListConsumerGroupOffsetsSpec> groupSpecs,
+                                                    
ListConsumerGroupOffsetsResult result) throws Exception {
+        assertEquals(groupSpecs.size(), result.all().get(10, 
TimeUnit.SECONDS).size());
+        for (Map.Entry<String, ListConsumerGroupOffsetsSpec> entry : 
groupSpecs.entrySet()) {
+            assertEquals(entry.getValue().topicPartitions(),
+                    
result.partitionsToOffsetAndMetadata(entry.getKey()).get().keySet());
+        }
+    }
+
     @Test
     public void testDeleteConsumerGroupsNumRetries() throws Exception {
         final Cluster cluster = mockCluster(3, 0);
@@ -6544,6 +6702,12 @@ public class KafkaAdminClientTest {
                     .setLogDir(logDir))));
     }
 
+    private OffsetFetchResponse offsetFetchResponse(Errors error, 
Map<TopicPartition, PartitionData> responseData) {
+        return new OffsetFetchResponse(THROTTLE,
+                                       Collections.singletonMap(GROUP_ID, 
error),
+                                       Collections.singletonMap(GROUP_ID, 
responseData));
+    }
+
     private static MemberDescription 
convertToMemberDescriptions(DescribedGroupMember member,
                                                                  
MemberAssignment assignment) {
         return new MemberDescription(member.memberId(),
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index ef858c5003..8c31c7cf69 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.admin;
 
 import 
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.KafkaException;
@@ -583,24 +584,29 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
-    synchronized public ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions 
options) {
-        // ignoring the groupId and assume one test would only work on one 
group only
+    synchronized public ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, 
ListConsumerGroupOffsetsOptions options) {
+        // ignoring the groups and assume one test would only work on one 
group only
+        if (groupSpecs.size() != 1)
+            throw new UnsupportedOperationException("Not implemented yet");
+
+        String group = groupSpecs.keySet().iterator().next();
+        Collection<TopicPartition> topicPartitions = 
groupSpecs.get(group).topicPartitions();
         final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> future = 
new KafkaFutureImpl<>();
 
         if (listConsumerGroupOffsetsException != null) {
             future.completeExceptionally(listConsumerGroupOffsetsException);
         } else {
-            if (options.topicPartitions().isEmpty()) {
+            if (topicPartitions.isEmpty()) {
                 future.complete(committedOffsets.entrySet().stream()
                         .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
new OffsetAndMetadata(entry.getValue()))));
             } else {
                 future.complete(committedOffsets.entrySet().stream()
-                        .filter(entry -> 
options.topicPartitions().contains(entry.getKey()))
+                        .filter(entry -> 
topicPartitions.contains(entry.getKey()))
                         .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
new OffsetAndMetadata(entry.getValue()))));
             }
         }
 
-        return new ListConsumerGroupOffsetsResult(future);
+        return new 
ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group),
 future));
     }
 
     @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
index 27597ce035..95fabb3fc2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
@@ -24,52 +24,140 @@ import static 
org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.RequestAndKeys;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
 
 public class ListConsumerGroupOffsetsHandlerTest {
 
     private final LogContext logContext = new LogContext();
-    private final String groupId = "group-id";
+    private final int throttleMs = 10;
+    private final String groupZero = "group0";
+    private final String groupOne = "group1";
+    private final String groupTwo = "group2";
+    private final List<String> groups = Arrays.asList(groupZero, groupOne, 
groupTwo);
     private final TopicPartition t0p0 = new TopicPartition("t0", 0);
     private final TopicPartition t0p1 = new TopicPartition("t0", 1);
     private final TopicPartition t1p0 = new TopicPartition("t1", 0);
     private final TopicPartition t1p1 = new TopicPartition("t1", 1);
-    private final List<TopicPartition> tps = Arrays.asList(t0p0, t0p1, t1p0, 
t1p1);
+    private final TopicPartition t2p0 = new TopicPartition("t2", 0);
+    private final TopicPartition t2p1 = new TopicPartition("t2", 1);
+    private final TopicPartition t2p2 = new TopicPartition("t2", 2);
+    private final Map<String, ListConsumerGroupOffsetsSpec> singleRequestMap = 
Collections.singletonMap(groupZero,
+            new 
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t0p1, t1p0, 
t1p1)));
+    private final Map<String, ListConsumerGroupOffsetsSpec> batchedRequestMap =
+            new HashMap<String, ListConsumerGroupOffsetsSpec>() {{
+                put(groupZero, new 
ListConsumerGroupOffsetsSpec().topicPartitions(singletonList(t0p0)));
+                put(groupOne, new 
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, 
t1p1)));
+                put(groupTwo, new 
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1, 
t2p0, t2p1, t2p2)));
+            }};
 
     @Test
     public void testBuildRequest() {
-        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
-        OffsetFetchRequest request = handler.buildBatchedRequest(1, 
singleton(CoordinatorKey.byGroupId(groupId))).build();
-        assertEquals(groupId, request.data().groups().get(0).groupId());
+        ListConsumerGroupOffsetsHandler handler =
+            new ListConsumerGroupOffsetsHandler(singleRequestMap, false, 
logContext);
+        OffsetFetchRequest request = 
handler.buildBatchedRequest(coordinatorKeys(groupZero)).build();
+        assertEquals(groupZero, request.data().groups().get(0).groupId());
         assertEquals(2, request.data().groups().get(0).topics().size());
         assertEquals(2, 
request.data().groups().get(0).topics().get(0).partitionIndexes().size());
         assertEquals(2, 
request.data().groups().get(0).topics().get(1).partitionIndexes().size());
     }
 
+    @Test
+    public void testBuildRequestWithMultipleGroups() {
+        Map<String, ListConsumerGroupOffsetsSpec> requestMap = new 
HashMap<>(this.batchedRequestMap);
+        String groupThree = "group3";
+        requestMap.put(groupThree, new ListConsumerGroupOffsetsSpec()
+                .topicPartitions(Arrays.asList(new TopicPartition("t3", 0), 
new TopicPartition("t3", 1))));
+
+        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(requestMap, false, logContext);
+        OffsetFetchRequest request1 = 
handler.buildBatchedRequest(coordinatorKeys(groupZero, groupOne, 
groupTwo)).build();
+        assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo), 
requestGroups(request1));
+
+        OffsetFetchRequest request2 = 
handler.buildBatchedRequest(coordinatorKeys(groupThree)).build();
+        assertEquals(Utils.mkSet(groupThree), requestGroups(request2));
+
+        Map<String, ListConsumerGroupOffsetsSpec> builtRequests = new 
HashMap<>();
+        request1.groupIdsToPartitions().forEach((group, partitions) ->
+                builtRequests.put(group, new 
ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+        request2.groupIdsToPartitions().forEach((group, partitions) ->
+                builtRequests.put(group, new 
ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+
+        assertEquals(requestMap, builtRequests);
+        Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics = 
request1.groupIdsToTopics();
+
+        assertEquals(3, groupIdsToTopics.size());
+        assertEquals(1, groupIdsToTopics.get(groupZero).size());
+        assertEquals(2, groupIdsToTopics.get(groupOne).size());
+        assertEquals(3, groupIdsToTopics.get(groupTwo).size());
+
+        assertEquals(1, 
groupIdsToTopics.get(groupZero).get(0).partitionIndexes().size());
+        assertEquals(1, 
groupIdsToTopics.get(groupOne).get(0).partitionIndexes().size());
+        assertEquals(2, 
groupIdsToTopics.get(groupOne).get(1).partitionIndexes().size());
+        assertEquals(1, 
groupIdsToTopics.get(groupTwo).get(0).partitionIndexes().size());
+        assertEquals(2, 
groupIdsToTopics.get(groupTwo).get(1).partitionIndexes().size());
+        assertEquals(3, 
groupIdsToTopics.get(groupTwo).get(2).partitionIndexes().size());
+
+        groupIdsToTopics = request2.groupIdsToTopics();
+        assertEquals(1, groupIdsToTopics.size());
+        assertEquals(1, groupIdsToTopics.get(groupThree).size());
+        assertEquals(2, 
groupIdsToTopics.get(groupThree).get(0).partitionIndexes().size());
+    }
+
+    @Test
+    public void testBuildRequestBatchGroups() {
+        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(batchedRequestMap, false, logContext);
+        Collection<RequestAndKeys<CoordinatorKey>> requests = 
handler.buildRequest(1, coordinatorKeys(groupZero, groupOne, groupTwo));
+        assertEquals(1, requests.size());
+        assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo), 
requestGroups((OffsetFetchRequest) requests.iterator().next().request.build()));
+    }
+
+    @Test
+    public void testBuildRequestDoesNotBatchGroup() {
+        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(batchedRequestMap, false, logContext);
+        // Disable batching.
+        ((CoordinatorStrategy) handler.lookupStrategy()).disableBatch();
+        Collection<RequestAndKeys<CoordinatorKey>> requests = 
handler.buildRequest(1, coordinatorKeys(groupZero, groupOne, groupTwo));
+        assertEquals(3, requests.size());
+        assertEquals(
+            Utils.mkSet(Utils.mkSet(groupZero), Utils.mkSet(groupOne), 
Utils.mkSet(groupTwo)),
+            requests.stream().map(requestAndKey -> 
requestGroups((OffsetFetchRequest) 
requestAndKey.request.build())).collect(Collectors.toSet())
+        );
+    }
+
     @Test
     public void testSuccessfulHandleResponse() {
         Map<TopicPartition, OffsetAndMetadata> expected = new HashMap<>();
         assertCompleted(handleWithError(Errors.NONE), expected);
     }
 
-
     @Test
     public void testSuccessfulHandleResponseWithOnePartitionError() {
         Map<TopicPartition, OffsetAndMetadata> expectedResult = 
Collections.singletonMap(t0p0, new OffsetAndMetadata(10L));
@@ -80,17 +168,62 @@ public class ListConsumerGroupOffsetsHandlerTest {
         
assertCompleted(handleWithPartitionError(Errors.UNSTABLE_OFFSET_COMMIT), 
expectedResult);
     }
 
+    @Test
+    public void 
testSuccessfulHandleResponseWithOnePartitionErrorWithMultipleGroups() {
+        Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapZero =
+            Collections.singletonMap(t0p0, new OffsetAndMetadata(10L));
+        Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapOne =
+            Collections.singletonMap(t1p1, new OffsetAndMetadata(10L));
+        Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapTwo =
+            Collections.singletonMap(t2p2, new OffsetAndMetadata(10L));
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult =
+            new HashMap<String, Map<TopicPartition, OffsetAndMetadata>>() {{
+                put(groupZero, offsetAndMetadataMapZero);
+                put(groupOne, offsetAndMetadataMapOne);
+                put(groupTwo, offsetAndMetadataMapTwo);
+            }};
+
+        assertCompletedForMultipleGroups(
+            
handleWithPartitionErrorMultipleGroups(Errors.UNKNOWN_TOPIC_OR_PARTITION), 
expectedResult);
+        assertCompletedForMultipleGroups(
+            
handleWithPartitionErrorMultipleGroups(Errors.TOPIC_AUTHORIZATION_FAILED), 
expectedResult);
+        assertCompletedForMultipleGroups(
+            
handleWithPartitionErrorMultipleGroups(Errors.UNSTABLE_OFFSET_COMMIT), 
expectedResult);
+    }
+
+    @Test
+    public void testSuccessfulHandleResponseWithMultipleGroups() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> expected = new 
HashMap<>();
+        Map<String, Errors> errorMap = errorMap(groups, Errors.NONE);
+        
assertCompletedForMultipleGroups(handleWithErrorWithMultipleGroups(errorMap, 
batchedRequestMap), expected);
+    }
+
     @Test
     public void testUnmappedHandleResponse() {
         assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
         assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
     }
 
+    @Test
+    public void testUnmappedHandleResponseWithMultipleGroups() {
+        Map<String, Errors> errorMap = new HashMap<>();
+        errorMap.put(groupZero, Errors.NOT_COORDINATOR);
+        errorMap.put(groupOne, Errors.COORDINATOR_NOT_AVAILABLE);
+        errorMap.put(groupTwo, Errors.NOT_COORDINATOR);
+        
assertUnmappedWithMultipleGroups(handleWithErrorWithMultipleGroups(errorMap, 
batchedRequestMap));
+    }
+
     @Test
     public void testRetriableHandleResponse() {
         assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
     }
 
+    @Test
+    public void testRetriableHandleResponseWithMultipleGroups() {
+        Map<String, Errors> errorMap = errorMap(groups, 
Errors.COORDINATOR_LOAD_IN_PROGRESS);
+        assertRetriable(handleWithErrorWithMultipleGroups(errorMap, 
batchedRequestMap));
+    }
+
     @Test
     public void testFailedHandleResponse() {
         assertFailed(GroupAuthorizationException.class, 
handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
@@ -98,10 +231,50 @@ public class ListConsumerGroupOffsetsHandlerTest {
         assertFailed(InvalidGroupIdException.class, 
handleWithError(Errors.INVALID_GROUP_ID));
     }
 
+    @Test
+    public void testFailedHandleResponseWithMultipleGroups() {
+        Map<String, Errors> errorMap = new HashMap<>();
+        errorMap.put(groupZero, Errors.GROUP_AUTHORIZATION_FAILED);
+        errorMap.put(groupOne, Errors.GROUP_ID_NOT_FOUND);
+        errorMap.put(groupTwo, Errors.INVALID_GROUP_ID);
+        Map<String, Class<? extends Throwable>> groupToExceptionMap = new 
HashMap<>();
+        groupToExceptionMap.put(groupZero, GroupAuthorizationException.class);
+        groupToExceptionMap.put(groupOne, GroupIdNotFoundException.class);
+        groupToExceptionMap.put(groupTwo, InvalidGroupIdException.class);
+        assertFailedForMultipleGroups(groupToExceptionMap,
+            handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap));
+    }
+
     private OffsetFetchResponse buildResponse(Errors error) {
-        Map<TopicPartition, PartitionData> responseData = new HashMap<>();
-        OffsetFetchResponse response = new OffsetFetchResponse(error, 
responseData);
-        return response;
+        return new OffsetFetchResponse(
+            throttleMs,
+            Collections.singletonMap(groupZero, error),
+            Collections.singletonMap(groupZero, new HashMap<>()));
+    }
+
+    private OffsetFetchResponse buildResponseWithMultipleGroups(
+        Map<String, Errors> errorMap,
+        Map<String, Map<TopicPartition, PartitionData>> responseData
+    ) {
+        return new OffsetFetchResponse(throttleMs, errorMap, responseData);
+    }
+
+    private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> handleWithErrorWithMultipleGroups(
+        Map<String, Errors> errorMap,
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs
+    ) {
+        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(groupSpecs, false, logContext);
+        Map<String, Map<TopicPartition, PartitionData>> responseData = new 
HashMap<>();
+        for (String group : errorMap.keySet()) {
+            responseData.put(group, new HashMap<>());
+        }
+        OffsetFetchResponse response = 
buildResponseWithMultipleGroups(errorMap, responseData);
+        return handler.handleResponse(new Node(1, "host", 1234),
+                errorMap.keySet()
+                        .stream()
+                        .map(CoordinatorKey::byGroupId)
+                        .collect(Collectors.toSet()),
+                response);
     }
 
     private OffsetFetchResponse buildResponseWithPartitionError(Errors error) {
@@ -110,24 +283,68 @@ public class ListConsumerGroupOffsetsHandlerTest {
         responseData.put(t0p0, new OffsetFetchResponse.PartitionData(10, 
Optional.empty(), "", Errors.NONE));
         responseData.put(t0p1, new OffsetFetchResponse.PartitionData(10, 
Optional.empty(), "", error));
 
-        OffsetFetchResponse response = new OffsetFetchResponse(Errors.NONE, 
responseData);
-        return response;
+        return new OffsetFetchResponse(Errors.NONE, responseData);
+    }
+
+    private OffsetFetchResponse 
buildResponseWithPartitionErrorWithMultipleGroups(Errors error) {
+        Map<TopicPartition, PartitionData> responseDataZero = new HashMap<>();
+        responseDataZero.put(t0p0, new OffsetFetchResponse.PartitionData(10, 
Optional.empty(), "", Errors.NONE));
+
+        Map<TopicPartition, PartitionData> responseDataOne = new HashMap<>();
+        responseDataOne.put(t0p0, new OffsetFetchResponse.PartitionData(10, 
Optional.empty(), "", error));
+        responseDataOne.put(t1p0, new OffsetFetchResponse.PartitionData(10, 
Optional.empty(), "", error));
+        responseDataOne.put(t1p1, new OffsetFetchResponse.PartitionData(10, 
Optional.empty(), "", Errors.NONE));
+
+        Map<TopicPartition, PartitionData> responseDataTwo = new HashMap<>();
+        responseDataTwo.put(t0p0, new OffsetFetchResponse.PartitionData(10, 
Optional.empty(), "", error));
+        responseDataTwo.put(t1p0, new OffsetFetchResponse.PartitionData(10, 
Optional.empty(), "", error));
+        responseDataTwo.put(t1p1, new OffsetFetchResponse.PartitionData(10, 
Optional.empty(), "", error));
+        responseDataTwo.put(t2p0, new OffsetFetchResponse.PartitionData(10, 
Optional.empty(), "", error));
+        responseDataTwo.put(t2p1, new OffsetFetchResponse.PartitionData(10, 
Optional.empty(), "", error));
+        responseDataTwo.put(t2p2, new OffsetFetchResponse.PartitionData(10, 
Optional.empty(), "", Errors.NONE));
+
+        Map<String, Map<TopicPartition, PartitionData>> responseData =
+            new HashMap<String, Map<TopicPartition, PartitionData>>() {{
+                put(groupZero, responseDataZero);
+                put(groupOne, responseDataOne);
+                put(groupTwo, responseDataTwo);
+            }};
+
+        Map<String, Errors> errorMap = errorMap(groups, Errors.NONE);
+        return new OffsetFetchResponse(0, errorMap, responseData);
     }
 
     private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> handleWithPartitionError(
         Errors error
     ) {
-        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
+        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(singleRequestMap,
+            false, logContext);
         OffsetFetchResponse response = buildResponseWithPartitionError(error);
-        return handler.handleResponse(new Node(1, "host", 1234), 
singleton(CoordinatorKey.byGroupId(groupId)), response);
+        return handler.handleResponse(new Node(1, "host", 1234),
+            singleton(CoordinatorKey.byGroupId(groupZero)), response);
+    }
+
+    private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> handleWithPartitionErrorMultipleGroups(
+        Errors error
+    ) {
+        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(
+                batchedRequestMap, false, logContext);
+        OffsetFetchResponse response = 
buildResponseWithPartitionErrorWithMultipleGroups(error);
+        return handler.handleResponse(
+            new Node(1, "host", 1234),
+            coordinatorKeys(groupZero, groupOne, groupTwo),
+            response);
     }
 
     private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> handleWithError(
         Errors error
     ) {
-        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
+        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(
+            singleRequestMap, false, logContext);
         OffsetFetchResponse response = buildResponse(error);
-        return handler.handleResponse(new Node(1, "host", 1234), 
singleton(CoordinatorKey.byGroupId(groupId)), response);
+        return handler.handleResponse(new Node(1, "host", 1234),
+            singleton(CoordinatorKey.byGroupId(groupZero)),
+            response);
     }
 
     private void assertUnmapped(
@@ -135,11 +352,19 @@ public class ListConsumerGroupOffsetsHandlerTest {
     ) {
         assertEquals(emptySet(), result.completedKeys.keySet());
         assertEquals(emptySet(), result.failedKeys.keySet());
-        assertEquals(singletonList(CoordinatorKey.byGroupId(groupId)), 
result.unmappedKeys);
+        assertEquals(singletonList(CoordinatorKey.byGroupId(groupZero)), 
result.unmappedKeys);
+    }
+
+    private void assertUnmappedWithMultipleGroups(
+            AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> result
+    ) {
+        assertEquals(emptySet(), result.completedKeys.keySet());
+        assertEquals(emptySet(), result.failedKeys.keySet());
+        assertEquals(coordinatorKeys(groupZero, groupOne, groupTwo), new 
HashSet<>(result.unmappedKeys));
     }
 
     private void assertRetriable(
-        AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> result
+            AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> result
     ) {
         assertEquals(emptySet(), result.completedKeys.keySet());
         assertEquals(emptySet(), result.failedKeys.keySet());
@@ -150,21 +375,64 @@ public class ListConsumerGroupOffsetsHandlerTest {
         AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> result,
         Map<TopicPartition, OffsetAndMetadata> expected
     ) {
-        CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
+        CoordinatorKey key = CoordinatorKey.byGroupId(groupZero);
         assertEquals(emptySet(), result.failedKeys.keySet());
         assertEquals(emptyList(), result.unmappedKeys);
         assertEquals(singleton(key), result.completedKeys.keySet());
-        assertEquals(expected, 
result.completedKeys.get(CoordinatorKey.byGroupId(groupId)));
+        assertEquals(expected, result.completedKeys.get(key));
+    }
+
+    private void assertCompletedForMultipleGroups(
+        AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> result,
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> expected
+    ) {
+        assertEquals(emptySet(), result.failedKeys.keySet());
+        assertEquals(emptyList(), result.unmappedKeys);
+        for (String g : expected.keySet()) {
+            CoordinatorKey key = CoordinatorKey.byGroupId(g);
+            assertTrue(result.completedKeys.containsKey(key));
+            assertEquals(expected.get(g), result.completedKeys.get(key));
+        }
     }
 
     private void assertFailed(
         Class<? extends Throwable> expectedExceptionType,
         AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> result
     ) {
-        CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
+        CoordinatorKey key = CoordinatorKey.byGroupId(groupZero);
         assertEquals(emptySet(), result.completedKeys.keySet());
         assertEquals(emptyList(), result.unmappedKeys);
         assertEquals(singleton(key), result.failedKeys.keySet());
         
assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key)));
     }
+
+    private void assertFailedForMultipleGroups(
+        Map<String, Class<? extends Throwable>> groupToExceptionMap,
+        AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> result
+    ) {
+        assertEquals(emptySet(), result.completedKeys.keySet());
+        assertEquals(emptyList(), result.unmappedKeys);
+        for (String g : groupToExceptionMap.keySet()) {
+            CoordinatorKey key = CoordinatorKey.byGroupId(g);
+            assertTrue(result.failedKeys.containsKey(key));
+            
assertTrue(groupToExceptionMap.get(g).isInstance(result.failedKeys.get(key)));
+        }
+    }
+
+    private Set<CoordinatorKey> coordinatorKeys(String... groups) {
+        return Stream.of(groups)
+                .map(CoordinatorKey::byGroupId)
+                .collect(Collectors.toSet());
+    }
+
+    private Set<String> requestGroups(OffsetFetchRequest request) {
+        return request.data().groups()
+                .stream()
+                .map(OffsetFetchRequestGroup::groupId)
+                .collect(Collectors.toSet());
+    }
+
+    private Map<String, Errors> errorMap(Collection<String> groups, Errors 
error) {
+        return groups.stream().collect(Collectors.toMap(Function.identity(), 
unused -> error));
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index da3acf4983..e7f25345c6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -172,6 +172,7 @@ public class KafkaConsumerTest {
     // Set auto commit interval lower than heartbeat so we don't need to deal 
with
     // a concurrent heartbeat request
     private final int autoCommitIntervalMs = 500;
+    private final int throttleMs = 10;
 
     private final String groupId = "mock-group";
     private final String memberId = "memberId";
@@ -2434,7 +2435,10 @@ public class KafkaConsumerTest {
             partitionData.put(entry.getKey(), new 
OffsetFetchResponse.PartitionData(entry.getValue(),
                     Optional.empty(), "", error));
         }
-        return new OffsetFetchResponse(Errors.NONE, partitionData);
+        return new OffsetFetchResponse(
+            throttleMs,
+            Collections.singletonMap(groupId, Errors.NONE),
+            Collections.singletonMap(groupId, partitionData));
     }
 
     private ListOffsetsResponse listOffsetsResponse(Map<TopicPartition, Long> 
offsets) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c65d33176f..db483c6c0f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -71,6 +71,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
 import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -140,6 +141,7 @@ public abstract class ConsumerCoordinatorTest {
     private final long retryBackoffMs = 100;
     private final int autoCommitIntervalMs = 2000;
     private final int requestTimeoutMs = 30000;
+    private final int throttleMs = 10;
     private final MockTime time = new MockTime();
     private GroupRebalanceConfig rebalanceConfig;
 
@@ -2872,7 +2874,7 @@ public abstract class ConsumerCoordinatorTest {
         OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(offset, leaderEpoch,
                 metadata, Errors.NONE);
 
-        client.prepareResponse(new OffsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
         Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = 
coordinator.fetchCommittedOffsets(singleton(t1p),
                 time.timer(Long.MAX_VALUE));
 
@@ -2888,7 +2890,7 @@ public abstract class ConsumerCoordinatorTest {
         OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(-1, Optional.empty(),
                 "", Errors.TOPIC_AUTHORIZATION_FAILED);
 
-        client.prepareResponse(new OffsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
         TopicAuthorizationException exception = 
assertThrows(TopicAuthorizationException.class, () ->
                 coordinator.fetchCommittedOffsets(singleton(t1p), 
time.timer(Long.MAX_VALUE)));
 
@@ -2901,7 +2903,7 @@ public abstract class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         subscriptions.assignFromUser(singleton(t1p));
-        
client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+        
client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, 
Collections.emptyMap()));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 
100L));
         
coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
@@ -2916,7 +2918,7 @@ public abstract class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         subscriptions.assignFromUser(singleton(t1p));
-        
client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED));
+        
client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED, 
Collections.emptyMap()));
         try {
             
coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
             fail("Expected group authorization error");
@@ -2959,7 +2961,7 @@ public abstract class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         subscriptions.assignFromUser(singleton(t1p));
-        client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR));
+        client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, 
Collections.emptyMap()));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 
100L));
         
coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
@@ -3435,7 +3437,11 @@ public abstract class ConsumerCoordinatorTest {
         OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(offset, leaderEpoch,
             metadata, Errors.NONE);
 
-        client.prepareResponse(new OffsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+        if (upperVersion < 8) {
+            client.prepareResponse(new OffsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+        } else {
+            client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+        }
         if (expectThrows) {
             assertThrows(UnsupportedVersionException.class,
                 () -> coordinator.fetchCommittedOffsets(singleton(t1p), 
time.timer(Long.MAX_VALUE)));
@@ -3690,8 +3696,10 @@ public abstract class ConsumerCoordinatorTest {
         return new OffsetCommitResponse(responseData);
     }
 
-    private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) {
-        return new OffsetFetchResponse(topLevelError, Collections.emptyMap());
+    private OffsetFetchResponse offsetFetchResponse(Errors error, 
Map<TopicPartition, PartitionData> responseData) {
+        return new OffsetFetchResponse(throttleMs,
+                                       singletonMap(groupId, error),
+                                       singletonMap(groupId, responseData));
     }
 
     private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors 
partitionLevelError, String metadata, long offset) {
@@ -3701,7 +3709,7 @@ public abstract class ConsumerCoordinatorTest {
     private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors 
partitionLevelError, String metadata, long offset, Optional<Integer> epoch) {
         OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(offset,
                 epoch, metadata, partitionLevelError);
-        return new OffsetFetchResponse(Errors.NONE, singletonMap(tp, data));
+        return offsetFetchResponse(Errors.NONE, singletonMap(tp, data));
     }
 
     private OffsetCommitCallback callback(final AtomicBoolean success) {
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 47c1d173b3..d5aee881c9 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -18,7 +18,7 @@
 package kafka.admin
 
 import java.time.{Duration, Instant}
-import java.util.Properties
+import java.util.{Collections, Properties}
 import com.fasterxml.jackson.dataformat.csv.CsvMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import kafka.utils._
@@ -753,9 +753,9 @@ object ConsumerGroupCommand extends Logging {
 
     private def getCommittedOffsets(groupId: String): Map[TopicPartition, 
OffsetAndMetadata] = {
       adminClient.listConsumerGroupOffsets(
-        groupId,
-        withTimeoutMs(new ListConsumerGroupOffsetsOptions)
-      ).partitionsToOffsetAndMetadata.get.asScala
+        Collections.singletonMap(groupId, new ListConsumerGroupOffsetsSpec),
+        withTimeoutMs(new ListConsumerGroupOffsetsOptions())
+      ).partitionsToOffsetAndMetadata(groupId).get().asScala
     }
 
     type GroupMetadata = immutable.Map[String, immutable.Map[TopicPartition, 
OffsetAndMetadata]]
diff --git 
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
index 76a3855a87..44b241a7ed 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
@@ -49,8 +49,8 @@ class ConsumerGroupServiceTest {
 
     
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
       .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
-    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
-      .thenReturn(listGroupOffsetsResult)
+    
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec),
 any()))
+      .thenReturn(listGroupOffsetsResult(group))
     when(admin.listOffsets(offsetsArgMatcher, any()))
       .thenReturn(listOffsetsResult)
 
@@ -60,7 +60,7 @@ class ConsumerGroupServiceTest {
     assertEquals(topicPartitions.size, assignments.get.size)
 
     verify(admin, 
times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any())
-    verify(admin, 
times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())
+    verify(admin, 
times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec),
 any())
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
@@ -112,8 +112,10 @@ class ConsumerGroupServiceTest {
     future.complete(consumerGroupDescription)
     
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
       .thenReturn(new 
DescribeConsumerGroupsResult(Collections.singletonMap(group, future)))
-    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
-      
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec),
 any()))
+      .thenReturn(
+        AdminClientTestUtils.listConsumerGroupOffsetsResult(
+          Collections.singletonMap(group, commitedOffsets)))
     when(admin.listOffsets(
       ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
       any()
@@ -142,7 +144,7 @@ class ConsumerGroupServiceTest {
     assertEquals(expectedOffsets, returnedOffsets)
 
     verify(admin, 
times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any())
-    verify(admin, 
times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())
+    verify(admin, 
times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec),
 any())
     verify(admin, 
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
 any())
     verify(admin, 
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
 any())
   }
@@ -192,9 +194,9 @@ class ConsumerGroupServiceTest {
     new DescribeConsumerGroupsResult(Collections.singletonMap(group, future))
   }
 
-  private def listGroupOffsetsResult: ListConsumerGroupOffsetsResult = {
+  private def listGroupOffsetsResult(groupId: String): 
ListConsumerGroupOffsetsResult = {
     val offsets = topicPartitions.map(_ -> new 
OffsetAndMetadata(100)).toMap.asJava
-    AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)
+    AdminClientTestUtils.listConsumerGroupOffsetsResult(Map(groupId -> 
offsets).asJava)
   }
 
   private def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
@@ -217,4 +219,8 @@ class ConsumerGroupServiceTest {
     }.toMap
     AdminClientTestUtils.describeTopicsResult(topicDescriptions.asJava)
   }
+
+  private def listConsumerGroupOffsetsSpec: util.Map[String, 
ListConsumerGroupOffsetsSpec] = {
+    Collections.singletonMap(group, new ListConsumerGroupOffsetsSpec())
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 6d17e93782..82c19949e3 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -320,7 +320,7 @@ class RequestQuotaTest extends BaseRequestTest {
               )
           )
         case ApiKeys.OFFSET_FETCH =>
-          new OffsetFetchRequest.Builder("test-group", false, List(tp).asJava, 
false)
+          new OffsetFetchRequest.Builder(Map("test-group"-> 
List(tp).asJava).asJava, false, false)
 
         case ApiKeys.FIND_COORDINATOR =>
           new FindCoordinatorRequest.Builder(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 02cfb0b49c..5240534ce7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ListOffsetsOptions;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -695,11 +696,12 @@ public class StoreChangelogReader implements 
ChangelogReader {
 
         try {
             // those which do not have a committed offset would default to 0
-            final ListConsumerGroupOffsetsOptions options = new 
ListConsumerGroupOffsetsOptions();
-            options.topicPartitions(new ArrayList<>(partitions));
-            options.requireStable(true);
-            final Map<TopicPartition, Long> committedOffsets = 
adminClient.listConsumerGroupOffsets(groupId, options)
-                    .partitionsToOffsetAndMetadata().get().entrySet()
+            final ListConsumerGroupOffsetsOptions options = new 
ListConsumerGroupOffsetsOptions()
+                    .requireStable(true);
+            final ListConsumerGroupOffsetsSpec spec = new 
ListConsumerGroupOffsetsSpec()
+                    .topicPartitions(new ArrayList<>(partitions));
+            final Map<TopicPartition, Long> committedOffsets = 
adminClient.listConsumerGroupOffsets(Collections.singletonMap(groupId, spec))
+                    .partitionsToOffsetAndMetadata(groupId).get().entrySet()
                     .stream()
                     .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue() == null ? 0L : e.getValue().offset()));
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 1961736620..fbc8d42326 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.admin.AdminClientTestUtils;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ListOffsetsOptions;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.MockAdminClient;
@@ -648,12 +649,12 @@ public class StoreChangelogReaderTest extends 
EasyMockSupport {
         final AtomicBoolean functionCalled = new AtomicBoolean(false);
         final MockAdminClient adminClient = new MockAdminClient() {
             @Override
-            public synchronized ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(final String groupId, final 
ListConsumerGroupOffsetsOptions options) {
+            public synchronized ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec> 
groupSpecs, final ListConsumerGroupOffsetsOptions options) {
                 if (functionCalled.get()) {
-                    return super.listConsumerGroupOffsets(groupId, options);
+                    return super.listConsumerGroupOffsets(groupSpecs, options);
                 } else {
                     functionCalled.set(true);
-                    return 
AdminClientTestUtils.listConsumerGroupOffsetsResult(new 
TimeoutException("KABOOM!"));
+                    return 
AdminClientTestUtils.listConsumerGroupOffsetsResult(groupSpecs.keySet().iterator().next(),
 new TimeoutException("KABOOM!"));
                 }
             }
         };
@@ -708,7 +709,7 @@ public class StoreChangelogReaderTest extends 
EasyMockSupport {
 
         final MockAdminClient adminClient = new MockAdminClient() {
             @Override
-            public synchronized ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(final String groupId, final 
ListConsumerGroupOffsetsOptions options) {
+            public synchronized ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec> 
groupSpecs, final ListConsumerGroupOffsetsOptions options) {
                 throw kaboom;
             }
         };
@@ -790,7 +791,7 @@ public class StoreChangelogReaderTest extends 
EasyMockSupport {
 
         final MockAdminClient adminClient = new MockAdminClient() {
             @Override
-            public synchronized ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(final String groupId, final 
ListConsumerGroupOffsetsOptions options) {
+            public synchronized ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec> 
groupSpecs, final ListConsumerGroupOffsetsOptions options) {
                 throw new AssertionError("Should not try to fetch committed 
offsets");
             }
         };

Reply via email to