This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 471a5e0  KAFKA-12234: Implement request/response for offsetFetch 
batching (KIP-709) (#10962)
471a5e0 is described below

commit 471a5e0e9c107e94031f368abe7672b142eca476
Author: Sanjana Kaundinya <[email protected]>
AuthorDate: Wed Jul 7 03:55:00 2021 -0700

    KAFKA-12234: Implement request/response for offsetFetch batching (KIP-709) 
(#10962)
    
    This implements the request and response portion of KIP-709. It updates the 
OffsetFetch request and response to support fetching offsets for multiple 
consumer groups at a time. If the broker does not support the new OffsetFetch 
version, clients can revert to the previous behaviour and use a request for 
each coordinator.
    
    Reviewers: Rajini Sivaram <[email protected]>, Konstantine 
Karantasis <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../internals/ListConsumerGroupOffsetsHandler.java |  10 +-
 .../consumer/internals/ConsumerCoordinator.java    |  26 +-
 .../kafka/common/requests/OffsetFetchRequest.java  | 170 ++++++++++-
 .../kafka/common/requests/OffsetFetchResponse.java | 131 +++++++-
 .../common/message/OffsetFetchRequest.json         |  26 +-
 .../common/message/OffsetFetchResponse.json        |  49 ++-
 .../ListConsumerGroupOffsetsHandlerTest.java       |   8 +-
 .../apache/kafka/common/message/MessageTest.java   | 242 ++++++++++++++-
 .../common/requests/OffsetFetchRequestTest.java    | 194 +++++++++---
 .../common/requests/OffsetFetchResponseTest.java   | 331 +++++++++++++++++----
 .../kafka/common/requests/RequestResponseTest.java | 147 +++++++--
 core/src/main/scala/kafka/server/KafkaApis.scala   | 173 +++++++----
 .../kafka/api/AuthorizerIntegrationTest.scala      | 215 ++++++++++++-
 .../unit/kafka/server/OffsetFetchRequestTest.scala | 237 +++++++++++++++
 15 files changed, 1701 insertions(+), 260 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index cfa7272..0b1ccb0 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -85,7 +85,7 @@
             
files="clients[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
 
     <suppress checks="NPathComplexity"
-            files="MessageTest.java"/>
+            files="MessageTest.java|OffsetFetchRequest.java"/>
 
     <!-- Clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
index 4439bc3..240516d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
@@ -87,12 +87,14 @@ public class ListConsumerGroupOffsetsHandler implements 
AdminApiHandler<Coordina
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
         List<CoordinatorKey> unmapped = new ArrayList<>();
 
-        if (response.error() != Errors.NONE) {
-            handleError(groupId, response.error(), failed, unmapped);
+        Errors responseError = response.groupLevelError(groupId.idValue);
+        if (responseError != Errors.NONE) {
+            handleError(groupId, responseError, failed, unmapped);
         } else {
             final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = 
new HashMap<>();
-            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry :
-                response.responseData().entrySet()) {
+            Map<TopicPartition, OffsetFetchResponse.PartitionData> 
partitionDataMap =
+                response.partitionDataMap(groupId.idValue);
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry : partitionDataMap.entrySet()) {
                 final TopicPartition topicPartition = entry.getKey();
                 OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
                 final Errors error = partitionData.error;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 39f4520..68cf8a9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1308,29 +1308,31 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
         @Override
         public void handle(OffsetFetchResponse response, 
RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
-            if (response.hasError()) {
-                Errors error = response.error();
-                log.debug("Offset fetch failed: {}", error.message());
+            Errors responseError = 
response.groupLevelError(rebalanceConfig.groupId);
+            if (responseError != Errors.NONE) {
+                log.debug("Offset fetch failed: {}", responseError.message());
 
-                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                     // just retry
-                    future.raise(error);
-                } else if (error == Errors.NOT_COORDINATOR) {
+                    future.raise(responseError);
+                } else if (responseError == Errors.NOT_COORDINATOR) {
                     // re-discover the coordinator and retry
-                    markCoordinatorUnknown(error);
-                    future.raise(error);
-                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+                    markCoordinatorUnknown(responseError);
+                    future.raise(responseError);
+                } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) 
{
                     
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
                 } else {
-                    future.raise(new KafkaException("Unexpected error in fetch 
offset response: " + error.message()));
+                    future.raise(new KafkaException("Unexpected error in fetch 
offset response: " + responseError.message()));
                 }
                 return;
             }
 
             Set<String> unauthorizedTopics = null;
-            Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(response.responseData().size());
+            Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData =
+                response.partitionDataMap(rebalanceConfig.groupId);
+            Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(responseData.size());
             Set<TopicPartition> unstableTxnOffsetTopicPartitions = new 
HashSet<>();
-            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry : response.responseData().entrySet()) {
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry : responseData.entrySet()) {
                 TopicPartition tp = entry.getKey();
                 OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
                 if (partitionData.hasError()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index c35d479..c5c094a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.common.requests;
 
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
 import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopic;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
@@ -38,6 +43,7 @@ public class OffsetFetchRequest extends AbstractRequest {
     private static final Logger log = 
LoggerFactory.getLogger(OffsetFetchRequest.class);
 
     private static final List<OffsetFetchRequestTopic> ALL_TOPIC_PARTITIONS = 
null;
+    private static final List<OffsetFetchRequestTopics> 
ALL_TOPIC_PARTITIONS_BATCH = null;
     private final OffsetFetchRequestData data;
 
     public static class Builder extends 
AbstractRequest.Builder<OffsetFetchRequest> {
@@ -78,26 +84,107 @@ public class OffsetFetchRequest extends AbstractRequest {
             return this.data.topics() == ALL_TOPIC_PARTITIONS;
         }
 
+        public Builder(Map<String, List<TopicPartition>> 
groupIdToTopicPartitionMap,
+                       boolean requireStable,
+                       boolean throwOnFetchStableOffsetsUnsupported) {
+            super(ApiKeys.OFFSET_FETCH);
+
+            List<OffsetFetchRequestGroup> groups = new ArrayList<>();
+            for (Entry<String, List<TopicPartition>> entry : 
groupIdToTopicPartitionMap.entrySet()) {
+                String groupName = entry.getKey();
+                List<TopicPartition> tpList = entry.getValue();
+                final List<OffsetFetchRequestTopics> topics;
+                if (tpList != null) {
+                    Map<String, OffsetFetchRequestTopics> 
offsetFetchRequestTopicMap =
+                        new HashMap<>();
+                    for (TopicPartition topicPartition : tpList) {
+                        String topicName = topicPartition.topic();
+                        OffsetFetchRequestTopics topic = 
offsetFetchRequestTopicMap.getOrDefault(
+                            topicName, new 
OffsetFetchRequestTopics().setName(topicName));
+                        
topic.partitionIndexes().add(topicPartition.partition());
+                        offsetFetchRequestTopicMap.put(topicName, topic);
+                    }
+                    topics = new 
ArrayList<>(offsetFetchRequestTopicMap.values());
+                } else {
+                    topics = ALL_TOPIC_PARTITIONS_BATCH;
+                }
+                groups.add(new OffsetFetchRequestGroup()
+                    .setGroupId(groupName)
+                    .setTopics(topics));
+            }
+            this.data = new OffsetFetchRequestData()
+                .setGroups(groups)
+                .setRequireStable(requireStable);
+            this.throwOnFetchStableOffsetsUnsupported = 
throwOnFetchStableOffsetsUnsupported;
+        }
+
         @Override
         public OffsetFetchRequest build(short version) {
             if (isAllTopicPartitions() && version < 2) {
                 throw new UnsupportedVersionException("The broker only 
supports OffsetFetchRequest " +
                     "v" + version + ", but we need v2 or newer to request all 
topic partitions.");
             }
-
+            if (data.groups().size() > 1 && version < 8) {
+                throw new NoBatchedOffsetFetchRequestException("Broker does 
not support"
+                    + " batching groups for fetch offset request on version " 
+ version);
+            }
             if (data.requireStable() && version < 7) {
                 if (throwOnFetchStableOffsetsUnsupported) {
                     throw new UnsupportedVersionException("Broker unexpectedly 
" +
                         "doesn't support requireStable flag on version " + 
version);
                 } else {
                     log.trace("Fallback the requireStable flag to false as 
broker " +
-                                  "only supports OffsetFetchRequest version 
{}. Need " +
-                                  "v7 or newer to enable this feature", 
version);
-
-                    return new 
OffsetFetchRequest(data.setRequireStable(false), version);
+                        "only supports OffsetFetchRequest version {}. Need " +
+                        "v7 or newer to enable this feature", version);
+                    data.setRequireStable(false);
+                }
+            }
+            // convert data to use the appropriate version since version 8 
uses different format
+            if (version < 8) {
+                OffsetFetchRequestData oldDataFormat = null;
+                if (!data.groups().isEmpty()) {
+                    OffsetFetchRequestGroup group = data.groups().get(0);
+                    String groupName = group.groupId();
+                    List<OffsetFetchRequestTopics> topics = group.topics();
+                    List<OffsetFetchRequestTopic> oldFormatTopics = null;
+                    if (topics != null) {
+                        oldFormatTopics = topics
+                            .stream()
+                            .map(t ->
+                                new OffsetFetchRequestTopic()
+                                    .setName(t.name())
+                                    .setPartitionIndexes(t.partitionIndexes()))
+                            .collect(Collectors.toList());
+                    }
+                    oldDataFormat = new OffsetFetchRequestData()
+                        .setGroupId(groupName)
+                        .setTopics(oldFormatTopics)
+                        .setRequireStable(data.requireStable());
+                }
+                return new OffsetFetchRequest(oldDataFormat == null ? data : 
oldDataFormat, version);
+            } else {
+                if (data.groups().isEmpty()) {
+                    String groupName = data.groupId();
+                    List<OffsetFetchRequestTopic> oldFormatTopics = 
data.topics();
+                    List<OffsetFetchRequestTopics> topics = null;
+                    if (oldFormatTopics != null) {
+                        topics = oldFormatTopics
+                            .stream()
+                            .map(t -> new OffsetFetchRequestTopics()
+                                .setName(t.name())
+                                .setPartitionIndexes(t.partitionIndexes()))
+                            .collect(Collectors.toList());
+                    }
+                    OffsetFetchRequestData convertedDataFormat =
+                        new OffsetFetchRequestData()
+                            .setGroups(Collections.singletonList(
+                                new OffsetFetchRequestGroup()
+                                    .setGroupId(groupName)
+                                    .setTopics(topics)))
+                            .setRequireStable(data.requireStable());
+                    return new OffsetFetchRequest(convertedDataFormat, 
version);
                 }
             }
-
             return new OffsetFetchRequest(data, version);
         }
 
@@ -107,6 +194,18 @@ public class OffsetFetchRequest extends AbstractRequest {
         }
     }
 
+    /**
+     * Indicates that it is not possible to fetch consumer groups in batches 
with FetchOffset.
+     * Instead consumer groups' offsets must be fetched one by one.
+     */
+    public static class NoBatchedOffsetFetchRequestException extends 
UnsupportedVersionException {
+        private static final long serialVersionUID = 1L;
+
+        public NoBatchedOffsetFetchRequestException(String message) {
+            super(message);
+        }
+    }
+
     public List<TopicPartition> partitions() {
         if (isAllPartitions()) {
             return null;
@@ -128,6 +227,37 @@ public class OffsetFetchRequest extends AbstractRequest {
         return data.requireStable();
     }
 
+    public Map<String, List<TopicPartition>> groupIdsToPartitions() {
+        Map<String, List<TopicPartition>> groupIdsToPartitions = new 
HashMap<>();
+        for (OffsetFetchRequestGroup group : data.groups()) {
+            List<TopicPartition> tpList = null;
+            if (group.topics() != ALL_TOPIC_PARTITIONS_BATCH) {
+                tpList = new ArrayList<>();
+                for (OffsetFetchRequestTopics topic : group.topics()) {
+                    for (Integer partitionIndex : topic.partitionIndexes()) {
+                        tpList.add(new TopicPartition(topic.name(), 
partitionIndex));
+                    }
+                }
+            }
+            groupIdsToPartitions.put(group.groupId(), tpList);
+        }
+        return groupIdsToPartitions;
+    }
+
+    public Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics() {
+        Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics =
+            new HashMap<>(data.groups().size());
+        data.groups().forEach(g -> groupIdsToTopics.put(g.groupId(), 
g.topics()));
+        return groupIdsToTopics;
+    }
+
+    public List<String> groupIds() {
+        return data.groups()
+            .stream()
+            .map(OffsetFetchRequestGroup::groupId)
+            .collect(Collectors.toList());
+    }
+
     private OffsetFetchRequest(OffsetFetchRequestData data, short version) {
         super(ApiKeys.OFFSET_FETCH, version);
         this.data = data;
@@ -152,13 +282,23 @@ public class OffsetFetchRequest extends AbstractRequest {
                         new TopicPartition(topic.name(), partitionIndex), 
partitionError);
                 }
             }
+            return new OffsetFetchResponse(error, responsePartitions);
         }
-
-        if (version() >= 3) {
-            return new OffsetFetchResponse(throttleTimeMs, error, 
responsePartitions);
-        } else {
+        if (version() == 2) {
             return new OffsetFetchResponse(error, responsePartitions);
         }
+        if (version() >= 3 && version() < 8) {
+            return new OffsetFetchResponse(throttleTimeMs, error, 
responsePartitions);
+        }
+        List<String> groupIds = groupIds();
+        Map<String, Errors> errorsMap = new HashMap<>(groupIds.size());
+        Map<String, Map<TopicPartition, OffsetFetchResponse.PartitionData>> 
partitionMap =
+            new HashMap<>(groupIds.size());
+        for (String g : groupIds) {
+            errorsMap.put(g, error);
+            partitionMap.put(g, responsePartitions);
+        }
+        return new OffsetFetchResponse(throttleTimeMs, errorsMap, 
partitionMap);
     }
 
     @Override
@@ -174,6 +314,16 @@ public class OffsetFetchRequest extends AbstractRequest {
         return data.topics() == ALL_TOPIC_PARTITIONS;
     }
 
+    public boolean isAllPartitionsForGroup(String groupId) {
+        OffsetFetchRequestGroup group = data
+            .groups()
+            .stream()
+            .filter(g -> g.groupId().equals(groupId))
+            .collect(Collectors.toList())
+            .get(0);
+        return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
+    }
+
     @Override
     public OffsetFetchRequestData data() {
         return data;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 594eb0e..213182e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.common.requests;
 
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
+import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
+import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartitions;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopic;
+import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
@@ -65,6 +70,7 @@ public class OffsetFetchResponse extends AbstractResponse {
 
     private final OffsetFetchResponseData data;
     private final Errors error;
+    private final Map<String, Errors> groupLevelErrors = new HashMap<>();
 
     public static final class PartitionData {
         public final long offset;
@@ -113,8 +119,14 @@ public class OffsetFetchResponse extends AbstractResponse {
         }
     }
 
+    public OffsetFetchResponse(OffsetFetchResponseData data) {
+        super(ApiKeys.OFFSET_FETCH);
+        this.data = data;
+        this.error = null;
+    }
+
     /**
-     * Constructor for all versions without throttle time.
+     * Constructor without throttle time.
      * @param error Potential coordinator or group level error code (for api 
version 2 and later)
      * @param responseData Fetched offset information grouped by 
topic-partition
      */
@@ -123,7 +135,7 @@ public class OffsetFetchResponse extends AbstractResponse {
     }
 
     /**
-     * Constructor with throttle time
+     * Constructor with throttle time for version 0 to 7
      * @param throttleTimeMs The time in milliseconds that this response was 
throttled
      * @param error Potential coordinator or group level error code (for api 
version 2 and later)
      * @param responseData Fetched offset information grouped by 
topic-partition
@@ -154,6 +166,48 @@ public class OffsetFetchResponse extends AbstractResponse {
         this.error = error;
     }
 
+    /**
+     * Constructor with throttle time for version 8 and above.
+     * @param throttleTimeMs The time in milliseconds that this response was 
throttled
+     * @param errors Potential coordinator or group level error code
+     * @param responseData Fetched offset information grouped by 
topic-partition and by group
+     */
+    public OffsetFetchResponse(int throttleTimeMs,
+                               Map<String, Errors> errors, Map<String,
+                               Map<TopicPartition, PartitionData>> 
responseData) {
+        super(ApiKeys.OFFSET_FETCH);
+        List<OffsetFetchResponseGroup> groupList = new ArrayList<>();
+        for (Entry<String, Map<TopicPartition, PartitionData>> entry : 
responseData.entrySet()) {
+            String groupName = entry.getKey();
+            Map<TopicPartition, PartitionData> partitionDataMap = 
entry.getValue();
+            Map<String, OffsetFetchResponseTopics> 
offsetFetchResponseTopicsMap = new HashMap<>();
+            for (Entry<TopicPartition, PartitionData> partitionEntry : 
partitionDataMap.entrySet()) {
+                String topicName = partitionEntry.getKey().topic();
+                OffsetFetchResponseTopics topic =
+                    offsetFetchResponseTopicsMap.getOrDefault(topicName,
+                        new OffsetFetchResponseTopics().setName(topicName));
+                PartitionData partitionData = partitionEntry.getValue();
+                topic.partitions().add(new OffsetFetchResponsePartitions()
+                    .setPartitionIndex(partitionEntry.getKey().partition())
+                    .setErrorCode(partitionData.error.code())
+                    .setCommittedOffset(partitionData.offset)
+                    .setCommittedLeaderEpoch(
+                        
partitionData.leaderEpoch.orElse(NO_PARTITION_LEADER_EPOCH))
+                    .setMetadata(partitionData.metadata));
+                offsetFetchResponseTopicsMap.put(topicName, topic);
+            }
+            groupList.add(new OffsetFetchResponseGroup()
+                .setGroupId(groupName)
+                .setTopics(new 
ArrayList<>(offsetFetchResponseTopicsMap.values()))
+                .setErrorCode(errors.get(groupName).code()));
+            groupLevelErrors.put(groupName, errors.get(groupName));
+        }
+        this.data = new OffsetFetchResponseData()
+            .setGroups(groupList)
+            .setThrottleTimeMs(throttleTimeMs);
+        this.error = null;
+    }
+
     public OffsetFetchResponse(OffsetFetchResponseData data, short version) {
         super(ApiKeys.OFFSET_FETCH);
         this.data = data;
@@ -161,7 +215,17 @@ public class OffsetFetchResponse extends AbstractResponse {
         // for older versions there is no top-level error in the response and 
all errors are partition errors,
         // so if there is a group or coordinator error at the partition level 
use that as the top-level error.
         // this way clients can depend on the top-level error regardless of 
the offset fetch version.
-        this.error = version >= 2 ? Errors.forCode(data.errorCode()) : 
topLevelError(data);
+        // we return the error differently starting with version 8, so we will 
only populate the
+        // error field if we are between version 2 and 7. if we are in version 
8 or greater, then
+        // we will populate the map of group id to error codes.
+        if (version < 8) {
+            this.error = version >= 2 ? Errors.forCode(data.errorCode()) : 
topLevelError(data);
+        } else {
+            for (OffsetFetchResponseGroup group : data.groups()) {
+                this.groupLevelErrors.put(group.groupId(), 
Errors.forCode(group.errorCode()));
+            }
+            this.error = null;
+        }
     }
 
     private static Errors topLevelError(OffsetFetchResponseData data) {
@@ -185,21 +249,46 @@ public class OffsetFetchResponse extends AbstractResponse 
{
         return error != Errors.NONE;
     }
 
+    public boolean groupHasError(String groupId) {
+        return groupLevelErrors.get(groupId) != Errors.NONE;
+    }
+
     public Errors error() {
         return error;
     }
 
+    public Errors groupLevelError(String groupId) {
+        if (error != null) {
+            return error;
+        }
+        return groupLevelErrors.get(groupId);
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> counts = new HashMap<>();
-        updateErrorCounts(counts, error);
-        data.topics().forEach(topic ->
-                topic.partitions().forEach(partition ->
+        if (!groupLevelErrors.isEmpty()) {
+            // built response with v8 or above
+            for (Map.Entry<String, Errors> entry : 
groupLevelErrors.entrySet()) {
+                updateErrorCounts(counts, entry.getValue());
+            }
+            for (OffsetFetchResponseGroup group : data.groups()) {
+                group.topics().forEach(topic ->
+                    topic.partitions().forEach(partition ->
                         updateErrorCounts(counts, 
Errors.forCode(partition.errorCode()))));
+            }
+        } else {
+            // built response with v0-v7
+            updateErrorCounts(counts, error);
+            data.topics().forEach(topic ->
+                topic.partitions().forEach(partition ->
+                    updateErrorCounts(counts, 
Errors.forCode(partition.errorCode()))));
+        }
         return counts;
     }
 
-    public Map<TopicPartition, PartitionData> responseData() {
+    // package-private for testing purposes
+    Map<TopicPartition, PartitionData> responseDataV0ToV7() {
         Map<TopicPartition, PartitionData> responseData = new HashMap<>();
         for (OffsetFetchResponseTopic topic : data.topics()) {
             for (OffsetFetchResponsePartition partition : topic.partitions()) {
@@ -214,6 +303,34 @@ public class OffsetFetchResponse extends AbstractResponse {
         return responseData;
     }
 
+    private Map<TopicPartition, PartitionData> buildResponseData(String 
groupId) {
+        Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+        OffsetFetchResponseGroup group = data
+            .groups()
+            .stream()
+            .filter(g -> g.groupId().equals(groupId))
+            .collect(Collectors.toList())
+            .get(0);
+        for (OffsetFetchResponseTopics topic : group.topics()) {
+            for (OffsetFetchResponsePartitions partition : topic.partitions()) 
{
+                responseData.put(new TopicPartition(topic.name(), 
partition.partitionIndex()),
+                    new PartitionData(partition.committedOffset(),
+                        
RequestUtils.getLeaderEpoch(partition.committedLeaderEpoch()),
+                        partition.metadata(),
+                        Errors.forCode(partition.errorCode()))
+                );
+            }
+        }
+        return responseData;
+    }
+
+    public Map<TopicPartition, PartitionData> partitionDataMap(String groupId) 
{
+        if (groupLevelErrors.isEmpty()) {
+            return responseDataV0ToV7();
+        }
+        return buildResponseData(groupId);
+    }
+
     public static OffsetFetchResponse parse(ByteBuffer buffer, short version) {
         return new OffsetFetchResponse(new OffsetFetchResponseData(new 
ByteBufferAccessor(buffer), version), version);
     }
diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json 
b/clients/src/main/resources/common/message/OffsetFetchRequest.json
index d4a4d5f..8f3c414 100644
--- a/clients/src/main/resources/common/message/OffsetFetchRequest.json
+++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json
@@ -31,19 +31,33 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 is adding the require stable flag.
-  "validVersions": "0-7",
+  //
+  // Version 8 is adding support for fetching offsets for multiple groups at a 
time
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+    { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": 
"groupId",
       "about": "The group to fetch offsets for." },
-    { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", 
"nullableVersions": "2+",
+    { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": 
"0-7", "nullableVersions": "2-7",
       "about": "Each topic we would like to fetch offsets for, or null to 
fetch offsets for all topics.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Name", "type": "string", "versions": "0-7", "entityType": 
"topicName",
         "about": "The topic name."},
-      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
+      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7",
         "about": "The partition indexes we would like to fetch offsets for." }
     ]},
+    { "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+",
+      "about": "Each group we would like to fetch offsets for", "fields": [
+      { "name": "groupId", "type": "string", "versions": "8+", "entityType": 
"groupId",
+        "about": "The group ID."},
+      { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": 
"8+", "nullableVersions": "8+",
+        "about": "Each topic we would like to fetch offsets for, or null to 
fetch offsets for all topics.", "fields": [
+        { "name": "Name", "type": "string", "versions": "8+", "entityType": 
"topicName",
+          "about": "The topic name."},
+        { "name": "PartitionIndexes", "type": "[]int32", "versions": "8+",
+          "about": "The partition indexes we would like to fetch offsets for." 
}
+      ]}
+    ]},
     {"name": "RequireStable", "type": "bool", "versions": "7+", "default": 
"false",
-     "about": "Whether broker should hold on returning unstable offsets but 
set a retriable error code for the partition."}
+      "about": "Whether broker should hold on returning unstable offsets but 
set a retriable error code for the partitions."}
   ]
 }
diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json 
b/clients/src/main/resources/common/message/OffsetFetchResponse.json
index b977701..dfad60e 100644
--- a/clients/src/main/resources/common/message/OffsetFetchResponse.json
+++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json
@@ -30,30 +30,57 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 adds pending offset commit as new error response on partition 
level.
-  "validVersions": "0-7",
+  //
+  // Version 8 is adding support for fetching offsets for multiple groups
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", 
"ignorable": true,
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
-    { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": 
"0+", 
+    { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": 
"0-7",
       "about": "The responses per topic.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Name", "type": "string", "versions": "0-7", "entityType": 
"topicName",
         "about": "The topic name." },
-      { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", 
"versions": "0+",
+      { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", 
"versions": "0-7",
         "about": "The responses per partition", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+        { "name": "PartitionIndex", "type": "int32", "versions": "0-7",
           "about": "The partition index." },
-        { "name": "CommittedOffset", "type": "int64", "versions": "0+",
+        { "name": "CommittedOffset", "type": "int64", "versions": "0-7",
           "about": "The committed message offset." },
-        { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", 
"default": "-1",
+        { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", 
"default": "-1",
           "ignorable": true, "about": "The leader epoch." },
-        { "name": "Metadata", "type": "string", "versions": "0+", 
"nullableVersions": "0+",
+        { "name": "Metadata", "type": "string", "versions": "0-7", 
"nullableVersions": "0-7",
           "about": "The partition metadata." },
-        { "name": "ErrorCode", "type": "int16", "versions": "0+",
+        { "name": "ErrorCode", "type": "int16", "versions": "0-7",
           "about": "The error code, or 0 if there was no error." }
       ]}
     ]},
-    { "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", 
"ignorable": true,
-      "about": "The top-level error code, or 0 if there was no error." }
+    { "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", 
"ignorable": true,
+      "about": "The top-level error code, or 0 if there was no error." },
+    {"name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
+      "about": "The responses per group id.", "fields": [
+      { "name": "groupId", "type": "string", "versions": "8+", "entityType": 
"groupId",
+        "about": "The group ID." },
+      { "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": 
"8+",
+        "about": "The responses per topic.", "fields": [
+        { "name": "Name", "type": "string", "versions": "8+", "entityType": 
"topicName",
+          "about": "The topic name." },
+        { "name": "Partitions", "type": "[]OffsetFetchResponsePartitions", 
"versions": "8+",
+          "about": "The responses per partition", "fields": [
+          { "name": "PartitionIndex", "type": "int32", "versions": "8+",
+            "about": "The partition index." },
+          { "name": "CommittedOffset", "type": "int64", "versions": "8+",
+            "about": "The committed message offset." },
+          { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "8+", 
"default": "-1",
+            "ignorable": true, "about": "The leader epoch." },
+          { "name": "Metadata", "type": "string", "versions": "8+", 
"nullableVersions": "8+",
+            "about": "The partition metadata." },
+          { "name": "ErrorCode", "type": "int16", "versions": "8+",
+            "about": "The partition-level error code, or 0 if there was no 
error." }
+        ]}
+      ]},
+      { "name": "ErrorCode", "type": "int16", "versions": "8+", "default": "0",
+        "about": "The group-level error code, or 0 if there was no error." }
+    ]}
   ]
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
index 5c98940..b461ea3 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
@@ -55,10 +55,10 @@ public class ListConsumerGroupOffsetsHandlerTest {
     public void testBuildRequest() {
         ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
         OffsetFetchRequest request = handler.buildRequest(1, 
singleton(CoordinatorKey.byGroupId(groupId))).build();
-        assertEquals(groupId, request.data().groupId());
-        assertEquals(2, request.data().topics().size());
-        assertEquals(2, 
request.data().topics().get(0).partitionIndexes().size());
-        assertEquals(2, 
request.data().topics().get(1).partitionIndexes().size());
+        assertEquals(groupId, request.data().groups().get(0).groupId());
+        assertEquals(2, request.data().groups().get(0).topics().size());
+        assertEquals(2, 
request.data().groups().get(0).topics().get(0).partitionIndexes().size());
+        assertEquals(2, 
request.data().groups().get(0).topics().get(1).partitionIndexes().size());
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java 
b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index e191ad6..aa00c24 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -38,9 +38,14 @@ import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitReque
 import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
 import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopic;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
+import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
+import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartitions;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopic;
+import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopics;
 import 
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
 import 
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
 import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
@@ -75,6 +80,7 @@ public final class MessageTest {
 
     private final String memberId = "memberId";
     private final String instanceId = "instanceId";
+    private final List<Integer> listOfVersionsNonBatchOffsetFetch = 
Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7);
 
     @Test
     public void testAddOffsetsToTxnVersions() throws Exception {
@@ -607,7 +613,7 @@ public final class MessageTest {
     }
 
     @Test
-    public void testOffsetFetchVersions() throws Exception {
+    public void testOffsetFetchV0ToV7() throws Exception {
         String groupId = "groupId";
         String topicName = "topic";
 
@@ -615,11 +621,11 @@ public final class MessageTest {
             new OffsetFetchRequestTopic()
                 .setName(topicName)
                 .setPartitionIndexes(Collections.singletonList(5)));
-        testAllMessageRoundTrips(new OffsetFetchRequestData()
+        testAllMessageRoundTripsOffsetFetchV0ToV7(new OffsetFetchRequestData()
                                      .setTopics(new ArrayList<>())
                                      .setGroupId(groupId));
 
-        testAllMessageRoundTrips(new OffsetFetchRequestData()
+        testAllMessageRoundTripsOffsetFetchV0ToV7(new OffsetFetchRequestData()
                                      .setGroupId(groupId)
                                      .setTopics(topics));
 
@@ -632,18 +638,18 @@ public final class MessageTest {
                                                        .setTopics(topics)
                                                        .setRequireStable(true);
 
-        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-            final short finalVersion = version;
+        for (int version : listOfVersionsNonBatchOffsetFetch) {
+            final short finalVersion = (short) version;
             if (version < 2) {
-                assertThrows(NullPointerException.class, () -> 
testAllMessageRoundTripsFromVersion(finalVersion, allPartitionData));
+                assertThrows(NullPointerException.class, () -> 
testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7(finalVersion, 
allPartitionData));
             } else {
-                testAllMessageRoundTripsFromVersion(version, allPartitionData);
+                testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7((short) 
version, allPartitionData);
             }
 
             if (version < 7) {
-                assertThrows(UnsupportedVersionException.class, () -> 
testAllMessageRoundTripsFromVersion(finalVersion, requireStableData));
+                assertThrows(UnsupportedVersionException.class, () -> 
testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7(finalVersion, 
requireStableData));
             } else {
-                testAllMessageRoundTripsFromVersion(finalVersion, 
requireStableData);
+                
testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7(finalVersion, 
requireStableData);
             }
         }
 
@@ -661,7 +667,7 @@ public final class MessageTest {
                                       
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())))))
                       .setErrorCode(Errors.NOT_COORDINATOR.code())
                       .setThrottleTimeMs(10);
-        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+        for (int version : listOfVersionsNonBatchOffsetFetch) {
             OffsetFetchResponseData responseData = response.get();
             if (version <= 1) {
                 responseData.setErrorCode(Errors.NONE.code());
@@ -675,7 +681,221 @@ public final class MessageTest {
                 
responseData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1);
             }
 
-            testAllMessageRoundTripsFromVersion(version, responseData);
+            testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7((short) 
version, responseData);
+        }
+    }
+
+    private void testAllMessageRoundTripsOffsetFetchV0ToV7(Message message) 
throws Exception {
+        testDuplication(message);
+        
testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7(message.lowestSupportedVersion(),
 message);
+    }
+
+    private void testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7(short 
fromVersion,
+        Message message) throws Exception {
+        for (short version = fromVersion; version <= 7; version++) {
+            testEquivalentMessageRoundTrip(version, message);
+        }
+    }
+
+    @Test
+    public void testOffsetFetchV8AndAboveSingleGroup() throws Exception {
+        String groupId = "groupId";
+        String topicName = "topic";
+
+        List<OffsetFetchRequestTopics> topic = Collections.singletonList(
+            new OffsetFetchRequestTopics()
+                .setName(topicName)
+                .setPartitionIndexes(Collections.singletonList(5)));
+
+        OffsetFetchRequestData allPartitionData = new OffsetFetchRequestData()
+            .setGroups(Collections.singletonList(
+                new OffsetFetchRequestGroup()
+                    .setGroupId(groupId)
+                    .setTopics(null)));
+
+        OffsetFetchRequestData specifiedPartitionData = new 
OffsetFetchRequestData()
+            .setGroups(Collections.singletonList(
+                new OffsetFetchRequestGroup()
+                    .setGroupId(groupId)
+                    .setTopics(topic)))
+            .setRequireStable(true);
+
+        testAllMessageRoundTripsOffsetFetchV8AndAbove(allPartitionData);
+        testAllMessageRoundTripsOffsetFetchV8AndAbove(specifiedPartitionData);
+
+        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+            if (version >= 8) {
+                
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, 
specifiedPartitionData);
+                
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, 
allPartitionData);
+            }
+        }
+
+        Supplier<OffsetFetchResponseData> response =
+            () -> new OffsetFetchResponseData()
+                .setGroups(Collections.singletonList(
+                    new OffsetFetchResponseGroup()
+                        .setGroupId(groupId)
+                        .setTopics(Collections.singletonList(
+                            new OffsetFetchResponseTopics()
+                                .setPartitions(Collections.singletonList(
+                                    new OffsetFetchResponsePartitions()
+                                        .setPartitionIndex(5)
+                                        .setMetadata(null)
+                                        .setCommittedOffset(100)
+                                        .setCommittedLeaderEpoch(3)
+                                        
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())))))
+                        .setErrorCode(Errors.NOT_COORDINATOR.code())))
+                .setThrottleTimeMs(10);
+        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+            if (version >= 8) {
+                OffsetFetchResponseData responseData = response.get();
+                
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, responseData);
+            }
+        }
+    }
+
+    @Test
+    public void testOffsetFetchV8AndAbove() throws Exception {
+        String groupOne = "group1";
+        String groupTwo = "group2";
+        String groupThree = "group3";
+        String groupFour = "group4";
+        String groupFive = "group5";
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String topic3 = "topic3";
+
+        OffsetFetchRequestTopics topicOne = new OffsetFetchRequestTopics()
+            .setName(topic1)
+            .setPartitionIndexes(Collections.singletonList(5));
+        OffsetFetchRequestTopics topicTwo = new OffsetFetchRequestTopics()
+            .setName(topic2)
+            .setPartitionIndexes(Collections.singletonList(10));
+        OffsetFetchRequestTopics topicThree = new OffsetFetchRequestTopics()
+            .setName(topic3)
+            .setPartitionIndexes(Collections.singletonList(15));
+
+        List<OffsetFetchRequestTopics> groupOneTopics = 
singletonList(topicOne);
+        OffsetFetchRequestGroup group1 =
+            new OffsetFetchRequestGroup()
+                .setGroupId(groupOne)
+                .setTopics(groupOneTopics);
+
+        List<OffsetFetchRequestTopics> groupTwoTopics = 
Arrays.asList(topicOne, topicTwo);
+        OffsetFetchRequestGroup group2 =
+            new OffsetFetchRequestGroup()
+                .setGroupId(groupTwo)
+                .setTopics(groupTwoTopics);
+
+        List<OffsetFetchRequestTopics> groupThreeTopics = 
Arrays.asList(topicOne, topicTwo, topicThree);
+        OffsetFetchRequestGroup group3 =
+            new OffsetFetchRequestGroup()
+                .setGroupId(groupThree)
+                .setTopics(groupThreeTopics);
+
+        OffsetFetchRequestGroup group4 =
+            new OffsetFetchRequestGroup()
+                .setGroupId(groupFour)
+                .setTopics(null);
+
+        OffsetFetchRequestGroup group5 =
+            new OffsetFetchRequestGroup()
+                .setGroupId(groupFive)
+                .setTopics(null);
+
+        OffsetFetchRequestData requestData = new OffsetFetchRequestData()
+            .setGroups(Arrays.asList(group1, group2, group3, group4, group5))
+            .setRequireStable(true);
+
+        testAllMessageRoundTripsOffsetFetchV8AndAbove(requestData);
+
+        
testAllMessageRoundTripsOffsetFetchV8AndAbove(requestData.setRequireStable(false));
+
+
+        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+            if (version >= 8) {
+                
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, requestData);
+            }
+        }
+
+        OffsetFetchResponseTopics responseTopic1 =
+            new OffsetFetchResponseTopics()
+                .setName(topic1)
+                .setPartitions(Collections.singletonList(
+                    new OffsetFetchResponsePartitions()
+                        .setPartitionIndex(5)
+                        .setMetadata(null)
+                        .setCommittedOffset(100)
+                        .setCommittedLeaderEpoch(3)
+                        
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())));
+        OffsetFetchResponseTopics responseTopic2 =
+            new OffsetFetchResponseTopics()
+                .setName(topic2)
+                .setPartitions(Collections.singletonList(
+                    new OffsetFetchResponsePartitions()
+                        .setPartitionIndex(10)
+                        .setMetadata("foo")
+                        .setCommittedOffset(200)
+                        .setCommittedLeaderEpoch(2)
+                        
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code())));
+        OffsetFetchResponseTopics responseTopic3 =
+            new OffsetFetchResponseTopics()
+                .setName(topic3)
+                .setPartitions(Collections.singletonList(
+                    new OffsetFetchResponsePartitions()
+                        .setPartitionIndex(15)
+                        .setMetadata("bar")
+                        .setCommittedOffset(300)
+                        .setCommittedLeaderEpoch(1)
+                        
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())));
+
+        OffsetFetchResponseGroup responseGroup1 =
+            new OffsetFetchResponseGroup()
+                .setGroupId(groupOne)
+                .setTopics(Collections.singletonList(responseTopic1))
+                .setErrorCode(Errors.NOT_COORDINATOR.code());
+        OffsetFetchResponseGroup responseGroup2 =
+            new OffsetFetchResponseGroup()
+                .setGroupId(groupTwo)
+                .setTopics(Arrays.asList(responseTopic1, responseTopic2))
+                .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+        OffsetFetchResponseGroup responseGroup3 =
+            new OffsetFetchResponseGroup()
+                .setGroupId(groupThree)
+                .setTopics(Arrays.asList(responseTopic1, responseTopic2, 
responseTopic3))
+                .setErrorCode(Errors.NONE.code());
+        OffsetFetchResponseGroup responseGroup4 =
+            new OffsetFetchResponseGroup()
+                .setGroupId(groupFour)
+                .setTopics(Arrays.asList(responseTopic1, responseTopic2, 
responseTopic3))
+                .setErrorCode(Errors.NONE.code());
+        OffsetFetchResponseGroup responseGroup5 =
+            new OffsetFetchResponseGroup()
+                .setGroupId(groupFive)
+                .setTopics(Arrays.asList(responseTopic1, responseTopic2, 
responseTopic3))
+                .setErrorCode(Errors.NONE.code());
+
+        Supplier<OffsetFetchResponseData> response =
+            () -> new OffsetFetchResponseData()
+                .setGroups(Arrays.asList(responseGroup1, responseGroup2, 
responseGroup3,
+                    responseGroup4, responseGroup5))
+                .setThrottleTimeMs(10);
+        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+            if (version >= 8) {
+                OffsetFetchResponseData responseData = response.get();
+                
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, responseData);
+            }
+        }
+    }
+
+    private void testAllMessageRoundTripsOffsetFetchV8AndAbove(Message 
message) throws Exception {
+        testDuplication(message);
+        testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove((short) 8, 
message);
+    }
+
+    private void 
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(short fromVersion, 
Message message) throws Exception {
+        for (short version = fromVersion; version <= 
message.highestSupportedVersion(); version++) {
+            testEquivalentMessageRoundTrip(version, message);
         }
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
index ddb2cd9..37076d0 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
@@ -18,10 +18,12 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.OffsetFetchRequest.Builder;
+import 
org.apache.kafka.common.requests.OffsetFetchRequest.NoBatchedOffsetFetchRequestException;
 import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -44,25 +46,24 @@ public class OffsetFetchRequestTest {
     private final int partitionOne = 1;
     private final String topicTwo = "topic2";
     private final int partitionTwo = 2;
-    private final String groupId = "groupId";
+    private final String topicThree = "topic3";
+    private final String group1 = "group1";
+    private final String group2 = "group2";
+    private final String group3 = "group3";
+    private final String group4 = "group4";
+    private final String group5 = "group5";
+    private List<String> groups = Arrays.asList(group1, group2, group3, 
group4, group5);
+
+    private final List<Integer> listOfVersionsNonBatchOffsetFetch = 
Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7);
+
 
     private OffsetFetchRequest.Builder builder;
-    private List<TopicPartition> partitions;
-
-    @BeforeEach
-    public void setUp() {
-        partitions = Arrays.asList(new TopicPartition(topicOne, partitionOne),
-                                   new TopicPartition(topicTwo, partitionTwo));
-        builder = new OffsetFetchRequest.Builder(
-            groupId,
-            false,
-            partitions,
-            false);
-    }
 
     @Test
     public void testConstructor() {
-        assertFalse(builder.isAllTopicPartitions());
+        List<TopicPartition> partitions = Arrays.asList(
+            new TopicPartition(topicOne, partitionOne),
+            new TopicPartition(topicTwo, partitionTwo));
         int throttleTimeMs = 10;
 
         Map<TopicPartition, PartitionData> expectedData = new HashMap<>();
@@ -76,60 +77,157 @@ public class OffsetFetchRequestTest {
         }
 
         for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-            OffsetFetchRequest request = builder.build(version);
-            assertFalse(request.isAllPartitions());
-            assertEquals(groupId, request.groupId());
-            assertEquals(partitions, request.partitions());
-
-            OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
-            assertEquals(Errors.NONE, response.error());
-            assertFalse(response.hasError());
-            assertEquals(Collections.singletonMap(Errors.NONE, version <= 
(short) 1 ? 3 : 1), response.errorCounts(),
-                "Incorrect error count for version " + version);
-
-            if (version <= 1) {
-                assertEquals(expectedData, response.responseData());
+            if (version < 8) {
+                builder = new OffsetFetchRequest.Builder(
+                    group1,
+                    false,
+                    partitions,
+                    false);
+                assertFalse(builder.isAllTopicPartitions());
+                OffsetFetchRequest request = builder.build(version);
+                assertFalse(request.isAllPartitions());
+                assertEquals(group1, request.groupId());
+                assertEquals(partitions, request.partitions());
+
+                OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
+                assertEquals(Errors.NONE, response.error());
+                assertFalse(response.hasError());
+                assertEquals(Collections.singletonMap(Errors.NONE, version <= 
(short) 1 ? 3 : 1), response.errorCounts(),
+                    "Incorrect error count for version " + version);
+
+                if (version <= 1) {
+                    assertEquals(expectedData, response.responseDataV0ToV7());
+                }
+
+                if (version >= 3) {
+                    assertEquals(throttleTimeMs, response.throttleTimeMs());
+                } else {
+                    assertEquals(DEFAULT_THROTTLE_TIME, 
response.throttleTimeMs());
+                }
+            } else {
+                builder = new Builder(Collections.singletonMap(group1, 
partitions), false, false);
+                OffsetFetchRequest request = builder.build(version);
+                Map<String, List<TopicPartition>> groupToPartitionMap =
+                    request.groupIdsToPartitions();
+                Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
+                    request.groupIdsToTopics();
+                assertFalse(request.isAllPartitionsForGroup(group1));
+                assertTrue(groupToPartitionMap.containsKey(group1) && 
groupToTopicMap.containsKey(
+                    group1));
+                assertEquals(partitions, groupToPartitionMap.get(group1));
+                OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
+                assertEquals(Errors.NONE, response.groupLevelError(group1));
+                assertFalse(response.groupHasError(group1));
+                assertEquals(Collections.singletonMap(Errors.NONE, 1), 
response.errorCounts(),
+                    "Incorrect error count for version " + version);
+                assertEquals(throttleTimeMs, response.throttleTimeMs());
             }
+        }
+    }
+
+    @Test
+    public void testConstructorWithMultipleGroups() {
+        List<TopicPartition> topic1Partitions = Arrays.asList(
+            new TopicPartition(topicOne, partitionOne),
+            new TopicPartition(topicOne, partitionTwo));
+        List<TopicPartition> topic2Partitions = Arrays.asList(
+            new TopicPartition(topicTwo, partitionOne),
+            new TopicPartition(topicTwo, partitionTwo));
+        List<TopicPartition> topic3Partitions = Arrays.asList(
+            new TopicPartition(topicThree, partitionOne),
+            new TopicPartition(topicThree, partitionTwo));
+        Map<String, List<TopicPartition>> groupToTp = new HashMap<>();
+        groupToTp.put(group1, topic1Partitions);
+        groupToTp.put(group2, topic2Partitions);
+        groupToTp.put(group3, topic3Partitions);
+        groupToTp.put(group4, null);
+        groupToTp.put(group5, null);
+        int throttleTimeMs = 10;
 
-            if (version >= 3) {
+        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+            if (version >= 8) {
+                builder = new Builder(groupToTp, false, false);
+                OffsetFetchRequest request = builder.build(version);
+                Map<String, List<TopicPartition>> groupToPartitionMap =
+                    request.groupIdsToPartitions();
+                Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
+                    request.groupIdsToTopics();
+                assertEquals(groupToTp.keySet(), groupToTopicMap.keySet());
+                assertEquals(groupToTp.keySet(), groupToPartitionMap.keySet());
+                assertFalse(request.isAllPartitionsForGroup(group1));
+                assertFalse(request.isAllPartitionsForGroup(group2));
+                assertFalse(request.isAllPartitionsForGroup(group3));
+                assertTrue(request.isAllPartitionsForGroup(group4));
+                assertTrue(request.isAllPartitionsForGroup(group5));
+                OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
+                for (String group : groups) {
+                    assertEquals(Errors.NONE, response.groupLevelError(group));
+                    assertFalse(response.groupHasError(group));
+                }
+                assertEquals(Collections.singletonMap(Errors.NONE, 5), 
response.errorCounts(),
+                    "Incorrect error count for version " + version);
                 assertEquals(throttleTimeMs, response.throttleTimeMs());
-            } else {
-                assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
             }
         }
     }
 
     @Test
+    public void testBuildThrowForUnsupportedBatchRequest() {
+        for (int version : listOfVersionsNonBatchOffsetFetch) {
+            Map<String, List<TopicPartition>> groupPartitionMap = new 
HashMap<>();
+            groupPartitionMap.put(group1, null);
+            groupPartitionMap.put(group2, null);
+            builder = new Builder(groupPartitionMap, true, false);
+            final short finalVersion = (short) version;
+            assertThrows(NoBatchedOffsetFetchRequestException.class, () -> 
builder.build(finalVersion));
+        }
+    }
+
+    @Test
     public void testConstructorFailForUnsupportedRequireStable() {
         for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-            // The builder needs to be initialized every cycle as the internal 
data `requireStable` flag is flipped.
-            builder = new OffsetFetchRequest.Builder(groupId, true, null, 
false);
-            final short finalVersion = version;
-            if (version < 2) {
-                assertThrows(UnsupportedVersionException.class, () -> 
builder.build(finalVersion));
-            } else {
-                OffsetFetchRequest request = builder.build(finalVersion);
-                assertEquals(groupId, request.groupId());
-                assertNull(request.partitions());
-                assertTrue(request.isAllPartitions());
-                if (version < 7) {
-                    assertFalse(request.requireStable());
+            if (version < 8) {
+                // The builder needs to be initialized every cycle as the 
internal data `requireStable` flag is flipped.
+                builder = new OffsetFetchRequest.Builder(group1, true, null, 
false);
+                final short finalVersion = version;
+                if (version < 2) {
+                    assertThrows(UnsupportedVersionException.class, () -> 
builder.build(finalVersion));
                 } else {
-                    assertTrue(request.requireStable());
+                    OffsetFetchRequest request = builder.build(finalVersion);
+                    assertEquals(group1, request.groupId());
+                    assertNull(request.partitions());
+                    assertTrue(request.isAllPartitions());
+                    if (version < 7) {
+                        assertFalse(request.requireStable());
+                    } else {
+                        assertTrue(request.requireStable());
+                    }
                 }
+            } else {
+                builder = new Builder(Collections.singletonMap(group1, null), 
true, false);
+                OffsetFetchRequest request = builder.build(version);
+                Map<String, List<TopicPartition>> groupToPartitionMap =
+                    request.groupIdsToPartitions();
+                Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
+                    request.groupIdsToTopics();
+                assertTrue(groupToPartitionMap.containsKey(group1) && 
groupToTopicMap.containsKey(
+                    group1));
+                assertNull(groupToPartitionMap.get(group1));
+                assertTrue(request.isAllPartitionsForGroup(group1));
+                assertTrue(request.requireStable());
             }
         }
     }
 
     @Test
     public void testBuildThrowForUnsupportedRequireStable() {
-        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-            builder = new OffsetFetchRequest.Builder(groupId, true, null, 
true);
+        for (int version : listOfVersionsNonBatchOffsetFetch) {
+            builder = new OffsetFetchRequest.Builder(group1, true, null, true);
             if (version < 7) {
-                final short finalVersion = version;
+                final short finalVersion = (short) version;
                 assertThrows(UnsupportedVersionException.class, () -> 
builder.build(finalVersion));
             } else {
-                OffsetFetchRequest request = builder.build(version);
+                OffsetFetchRequest request = builder.build((short) version);
                 assertTrue(request.requireStable());
             }
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
index e202fc2..c73ea2a 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
@@ -18,8 +18,11 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
+import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
+import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartitions;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopic;
+import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
@@ -44,12 +47,19 @@ public class OffsetFetchResponseTest {
     private final int offset = 100;
     private final String metadata = "metadata";
 
+    private final String groupOne = "group1";
+    private final String groupTwo = "group2";
+    private final String groupThree = "group3";
     private final String topicOne = "topic1";
     private final int partitionOne = 1;
     private final Optional<Integer> leaderEpochOne = Optional.of(1);
     private final String topicTwo = "topic2";
     private final int partitionTwo = 2;
     private final Optional<Integer> leaderEpochTwo = Optional.of(2);
+    private final String topicThree = "topic3";
+    private final int partitionThree = 3;
+    private final Optional<Integer> leaderEpochThree = Optional.of(3);
+
 
     private Map<TopicPartition, PartitionData> partitionDataMap;
 
@@ -72,99 +82,228 @@ public class OffsetFetchResponseTest {
 
     @Test
     public void testConstructor() {
-        OffsetFetchResponse response = new OffsetFetchResponse(throttleTimeMs, 
Errors.NOT_COORDINATOR, partitionDataMap);
-        assertEquals(Errors.NOT_COORDINATOR, response.error());
-        assertEquals(3, response.errorCounts().size());
-        assertEquals(Utils.mkMap(Utils.mkEntry(Errors.NOT_COORDINATOR, 1),
-                Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1),
-                Utils.mkEntry(Errors.UNKNOWN_TOPIC_OR_PARTITION, 1)),
-                response.errorCounts());
-
-        assertEquals(throttleTimeMs, response.throttleTimeMs());
-
-        Map<TopicPartition, PartitionData> responseData = 
response.responseData();
-        assertEquals(partitionDataMap, responseData);
-        responseData.forEach(
-            (tp, data) -> assertTrue(data.hasError())
-        );
+        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+            if (version < 8) {
+                OffsetFetchResponse response = new 
OffsetFetchResponse(throttleTimeMs, Errors.NOT_COORDINATOR, partitionDataMap);
+                assertEquals(Errors.NOT_COORDINATOR, response.error());
+                assertEquals(3, response.errorCounts().size());
+                assertEquals(Utils.mkMap(Utils.mkEntry(Errors.NOT_COORDINATOR, 
1),
+                    Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1),
+                    Utils.mkEntry(Errors.UNKNOWN_TOPIC_OR_PARTITION, 1)),
+                    response.errorCounts());
+
+                assertEquals(throttleTimeMs, response.throttleTimeMs());
+
+                Map<TopicPartition, PartitionData> responseData = 
response.responseDataV0ToV7();
+                assertEquals(partitionDataMap, responseData);
+                responseData.forEach((tp, data) -> 
assertTrue(data.hasError()));
+            } else {
+                OffsetFetchResponse response = new OffsetFetchResponse(
+                    throttleTimeMs,
+                    Collections.singletonMap(groupOne, Errors.NOT_COORDINATOR),
+                    Collections.singletonMap(groupOne, partitionDataMap));
+                assertEquals(Errors.NOT_COORDINATOR, 
response.groupLevelError(groupOne));
+                assertEquals(3, response.errorCounts().size());
+                assertEquals(Utils.mkMap(Utils.mkEntry(Errors.NOT_COORDINATOR, 
1),
+                    Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1),
+                    Utils.mkEntry(Errors.UNKNOWN_TOPIC_OR_PARTITION, 1)),
+                    response.errorCounts());
+
+                assertEquals(throttleTimeMs, response.throttleTimeMs());
+
+                Map<TopicPartition, PartitionData> responseData = 
response.partitionDataMap(groupOne);
+                assertEquals(partitionDataMap, responseData);
+                responseData.forEach((tp, data) -> 
assertTrue(data.hasError()));
+            }
+        }
     }
 
-    /**
-     * Test behavior changes over the versions. Refer to 
resources.common.messages.OffsetFetchResponse.json
-     */
     @Test
-    public void testStructBuild() {
-        partitionDataMap.put(new TopicPartition(topicTwo, partitionTwo), new 
PartitionData(
+    public void testConstructorWithMultipleGroups() {
+        Map<String, Map<TopicPartition, PartitionData>> responseData = new 
HashMap<>();
+        Map<String, Errors> errorMap = new HashMap<>();
+        Map<TopicPartition, PartitionData> pd1 = new HashMap<>();
+        Map<TopicPartition, PartitionData> pd2 = new HashMap<>();
+        Map<TopicPartition, PartitionData> pd3 = new HashMap<>();
+        pd1.put(new TopicPartition(topicOne, partitionOne), new PartitionData(
+            offset,
+            leaderEpochOne,
+            metadata,
+            Errors.TOPIC_AUTHORIZATION_FAILED));
+        pd2.put(new TopicPartition(topicTwo, partitionTwo), new PartitionData(
             offset,
             leaderEpochTwo,
             metadata,
-            Errors.GROUP_AUTHORIZATION_FAILED
-        ));
+            Errors.UNKNOWN_TOPIC_OR_PARTITION));
+        pd3.put(new TopicPartition(topicThree, partitionThree), new 
PartitionData(
+            offset,
+            leaderEpochThree,
+            metadata,
+            Errors.NONE));
+        responseData.put(groupOne, pd1);
+        responseData.put(groupTwo, pd2);
+        responseData.put(groupThree, pd3);
+        errorMap.put(groupOne, Errors.NOT_COORDINATOR);
+        errorMap.put(groupTwo, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+        errorMap.put(groupThree, Errors.NONE);
+        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+            if (version >= 8) {
+                OffsetFetchResponse response = new OffsetFetchResponse(
+                    throttleTimeMs, errorMap, responseData);
+
+                assertEquals(Errors.NOT_COORDINATOR, 
response.groupLevelError(groupOne));
+                assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, 
response.groupLevelError(groupTwo));
+                assertEquals(Errors.NONE, 
response.groupLevelError(groupThree));
+                assertTrue(response.groupHasError(groupOne));
+                assertTrue(response.groupHasError(groupTwo));
+                assertFalse(response.groupHasError(groupThree));
+                assertEquals(5, response.errorCounts().size());
+                assertEquals(Utils.mkMap(Utils.mkEntry(Errors.NOT_COORDINATOR, 
1),
+                    Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1),
+                    Utils.mkEntry(Errors.UNKNOWN_TOPIC_OR_PARTITION, 1),
+                    Utils.mkEntry(Errors.COORDINATOR_LOAD_IN_PROGRESS, 1),
+                    Utils.mkEntry(Errors.NONE, 2)),
+                    response.errorCounts());
 
-        OffsetFetchResponse latestResponse = new 
OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap);
+                assertEquals(throttleTimeMs, response.throttleTimeMs());
 
+                Map<TopicPartition, PartitionData> responseData1 = 
response.partitionDataMap(groupOne);
+                assertEquals(pd1, responseData1);
+                responseData1.forEach((tp, data) -> 
assertTrue(data.hasError()));
+                Map<TopicPartition, PartitionData> responseData2 = 
response.partitionDataMap(groupTwo);
+                assertEquals(pd2, responseData2);
+                responseData2.forEach((tp, data) -> 
assertTrue(data.hasError()));
+                Map<TopicPartition, PartitionData> responseData3 = 
response.partitionDataMap(groupThree);
+                assertEquals(pd3, responseData3);
+                responseData3.forEach((tp, data) -> 
assertFalse(data.hasError()));
+            }
+        }
+    }
+
+    /**
+     * Test behavior changes over the versions. Refer to 
resources.common.messages.OffsetFetchResponse.json
+     */
+    @Test
+    public void testStructBuild() {
         for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-            OffsetFetchResponseData data = new OffsetFetchResponseData(
-                new ByteBufferAccessor(latestResponse.serialize(version)), 
version);
+            if (version < 8) {
+                partitionDataMap.put(new TopicPartition(topicTwo, 
partitionTwo), new PartitionData(
+                    offset,
+                    leaderEpochTwo,
+                    metadata,
+                    Errors.GROUP_AUTHORIZATION_FAILED
+                ));
 
-            OffsetFetchResponse oldResponse = new OffsetFetchResponse(data, 
version);
+                OffsetFetchResponse latestResponse = new 
OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap);
+                OffsetFetchResponseData data = new OffsetFetchResponseData(
+                    new ByteBufferAccessor(latestResponse.serialize(version)), 
version);
 
-            if (version <= 1) {
-                assertEquals(Errors.NONE.code(), data.errorCode());
+                OffsetFetchResponse oldResponse = new 
OffsetFetchResponse(data, version);
 
-                // Partition level error populated in older versions.
-                assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, 
oldResponse.error());
-                
assertEquals(Utils.mkMap(Utils.mkEntry(Errors.GROUP_AUTHORIZATION_FAILED, 2),
-                        Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1)), 
oldResponse.errorCounts());
+                if (version <= 1) {
+                    assertEquals(Errors.NONE.code(), data.errorCode());
 
-            } else {
-                assertEquals(Errors.NONE.code(), data.errorCode());
+                    // Partition level error populated in older versions.
+                    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, 
oldResponse.error());
+                    
assertEquals(Utils.mkMap(Utils.mkEntry(Errors.GROUP_AUTHORIZATION_FAILED, 2),
+                        Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1)),
+                        oldResponse.errorCounts());
+                } else {
+                    assertEquals(Errors.NONE.code(), data.errorCode());
 
-                assertEquals(Errors.NONE, oldResponse.error());
-                assertEquals(Utils.mkMap(
+                    assertEquals(Errors.NONE, oldResponse.error());
+                    assertEquals(Utils.mkMap(
                         Utils.mkEntry(Errors.NONE, 1),
                         Utils.mkEntry(Errors.GROUP_AUTHORIZATION_FAILED, 1),
-                        Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1)), 
oldResponse.errorCounts());
-            }
+                        Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1)),
+                        oldResponse.errorCounts());
+                }
+
+                if (version <= 2) {
+                    assertEquals(DEFAULT_THROTTLE_TIME, 
oldResponse.throttleTimeMs());
+                } else {
+                    assertEquals(throttleTimeMs, oldResponse.throttleTimeMs());
+                }
 
-            if (version <= 2) {
-                assertEquals(DEFAULT_THROTTLE_TIME, 
oldResponse.throttleTimeMs());
+                Map<TopicPartition, PartitionData> expectedDataMap = new 
HashMap<>();
+                for (Map.Entry<TopicPartition, PartitionData> entry : 
partitionDataMap.entrySet()) {
+                    PartitionData partitionData = entry.getValue();
+                    expectedDataMap.put(entry.getKey(), new PartitionData(
+                        partitionData.offset,
+                        version <= 4 ? Optional.empty() : 
partitionData.leaderEpoch,
+                        partitionData.metadata,
+                        partitionData.error
+                    ));
+                }
+
+                Map<TopicPartition, PartitionData> responseData = 
oldResponse.responseDataV0ToV7();
+                assertEquals(expectedDataMap, responseData);
+
+                responseData.forEach((tp, rdata) -> 
assertTrue(rdata.hasError()));
             } else {
+                partitionDataMap.put(new TopicPartition(topicTwo, 
partitionTwo), new PartitionData(
+                    offset,
+                    leaderEpochTwo,
+                    metadata,
+                    Errors.GROUP_AUTHORIZATION_FAILED));
+                OffsetFetchResponse latestResponse = new OffsetFetchResponse(
+                    throttleTimeMs,
+                    Collections.singletonMap(groupOne, Errors.NONE),
+                    Collections.singletonMap(groupOne, partitionDataMap));
+                OffsetFetchResponseData data = new OffsetFetchResponseData(
+                    new ByteBufferAccessor(latestResponse.serialize(version)), 
version);
+                OffsetFetchResponse oldResponse = new 
OffsetFetchResponse(data, version);
+                assertEquals(Errors.NONE.code(), 
data.groups().get(0).errorCode());
+
+                assertEquals(Errors.NONE, 
oldResponse.groupLevelError(groupOne));
+                assertEquals(Utils.mkMap(
+                    Utils.mkEntry(Errors.NONE, 1),
+                    Utils.mkEntry(Errors.GROUP_AUTHORIZATION_FAILED, 1),
+                    Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1)),
+                    oldResponse.errorCounts());
                 assertEquals(throttleTimeMs, oldResponse.throttleTimeMs());
-            }
 
-            Map<TopicPartition, PartitionData> expectedDataMap = new 
HashMap<>();
-            for (Map.Entry<TopicPartition, PartitionData> entry : 
partitionDataMap.entrySet()) {
-                PartitionData partitionData = entry.getValue();
-                expectedDataMap.put(entry.getKey(), new PartitionData(
-                    partitionData.offset,
-                    version <= 4 ? Optional.empty() : 
partitionData.leaderEpoch,
-                    partitionData.metadata,
-                    partitionData.error
-                ));
-            }
+                Map<TopicPartition, PartitionData> expectedDataMap = new 
HashMap<>();
+                for (Map.Entry<TopicPartition, PartitionData> entry : 
partitionDataMap.entrySet()) {
+                    PartitionData partitionData = entry.getValue();
+                    expectedDataMap.put(entry.getKey(), new PartitionData(
+                        partitionData.offset,
+                        partitionData.leaderEpoch,
+                        partitionData.metadata,
+                        partitionData.error
+                    ));
+                }
 
-            Map<TopicPartition, PartitionData> responseData = 
oldResponse.responseData();
-            assertEquals(expectedDataMap, responseData);
+                Map<TopicPartition, PartitionData> responseData = 
oldResponse.partitionDataMap(groupOne);
+                assertEquals(expectedDataMap, responseData);
 
-            responseData.forEach((tp, rdata) -> assertTrue(rdata.hasError()));
+                responseData.forEach((tp, rdata) -> 
assertTrue(rdata.hasError()));
+            }
         }
     }
 
     @Test
     public void testShouldThrottle() {
-        OffsetFetchResponse response = new OffsetFetchResponse(throttleTimeMs, 
Errors.NONE, partitionDataMap);
         for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-            if (version >= 4) {
-                assertTrue(response.shouldClientThrottle(version));
+            if (version < 8) {
+                OffsetFetchResponse response = new 
OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap);
+                if (version >= 4) {
+                    assertTrue(response.shouldClientThrottle(version));
+                } else {
+                    assertFalse(response.shouldClientThrottle(version));
+                }
             } else {
-                assertFalse(response.shouldClientThrottle(version));
+                OffsetFetchResponse response = new OffsetFetchResponse(
+                    throttleTimeMs,
+                    Collections.singletonMap(groupOne, Errors.NOT_COORDINATOR),
+                    Collections.singletonMap(groupOne, partitionDataMap));
+                assertTrue(response.shouldClientThrottle(version));
             }
         }
     }
 
     @Test
-    public void testNullableMetadata() {
+    public void testNullableMetadataV0ToV7() {
         PartitionData pd = new PartitionData(
             offset,
             leaderEpochOne,
@@ -196,7 +335,43 @@ public class OffsetFetchResponseTest {
     }
 
     @Test
-    public void testUseDefaultLeaderEpoch() {
+    public void testNullableMetadataV8AndAbove() {
+        PartitionData pd = new PartitionData(
+            offset,
+            leaderEpochOne,
+            null,
+            Errors.UNKNOWN_TOPIC_OR_PARTITION);
+        // test PartitionData.equals with null metadata
+        assertEquals(pd, pd);
+        partitionDataMap.clear();
+        partitionDataMap.put(new TopicPartition(topicOne, partitionOne), pd);
+
+        OffsetFetchResponse response = new OffsetFetchResponse(
+            throttleTimeMs,
+            Collections.singletonMap(groupOne, 
Errors.GROUP_AUTHORIZATION_FAILED),
+            Collections.singletonMap(groupOne, partitionDataMap));
+        OffsetFetchResponseData expectedData =
+            new OffsetFetchResponseData()
+                .setGroups(Collections.singletonList(
+                    new OffsetFetchResponseGroup()
+                        .setGroupId(groupOne)
+                        .setTopics(Collections.singletonList(
+                            new OffsetFetchResponseTopics()
+                                .setName(topicOne)
+                                .setPartitions(Collections.singletonList(
+                                    new OffsetFetchResponsePartitions()
+                                        .setPartitionIndex(partitionOne)
+                                        .setCommittedOffset(offset)
+                                        
.setCommittedLeaderEpoch(leaderEpochOne.orElse(-1))
+                                        
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                                        .setMetadata(null)))))
+                        
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())))
+                .setThrottleTimeMs(throttleTimeMs);
+        assertEquals(expectedData, response.data());
+    }
+
+    @Test
+    public void testUseDefaultLeaderEpochV0ToV7() {
         final Optional<Integer> emptyLeaderEpoch = Optional.empty();
         partitionDataMap.clear();
 
@@ -227,4 +402,40 @@ public class OffsetFetchResponseTest {
                 );
         assertEquals(expectedData, response.data());
     }
+
+    @Test
+    public void testUseDefaultLeaderEpochV8() {
+        final Optional<Integer> emptyLeaderEpoch = Optional.empty();
+        partitionDataMap.clear();
+
+        partitionDataMap.put(new TopicPartition(topicOne, partitionOne),
+            new PartitionData(
+                offset,
+                emptyLeaderEpoch,
+                metadata,
+                Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        );
+        OffsetFetchResponse response = new OffsetFetchResponse(
+            throttleTimeMs,
+            Collections.singletonMap(groupOne, Errors.NOT_COORDINATOR),
+            Collections.singletonMap(groupOne, partitionDataMap));
+        OffsetFetchResponseData expectedData =
+            new OffsetFetchResponseData()
+                .setGroups(Collections.singletonList(
+                    new OffsetFetchResponseGroup()
+                        .setGroupId(groupOne)
+                        .setTopics(Collections.singletonList(
+                            new OffsetFetchResponseTopics()
+                                .setName(topicOne)
+                                .setPartitions(Collections.singletonList(
+                                    new OffsetFetchResponsePartitions()
+                                        .setPartitionIndex(partitionOne)
+                                        .setCommittedOffset(offset)
+                                        
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+                                        
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                                        .setMetadata(metadata)))))
+                        .setErrorCode(Errors.NOT_COORDINATOR.code())))
+                .setThrottleTimeMs(throttleTimeMs);
+        assertEquals(expectedData, response.data());
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 4d8e69e..887d15c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -233,6 +233,7 @@ import static 
org.apache.kafka.common.protocol.ApiKeys.JOIN_GROUP;
 import static org.apache.kafka.common.protocol.ApiKeys.LEADER_AND_ISR;
 import static org.apache.kafka.common.protocol.ApiKeys.LIST_GROUPS;
 import static org.apache.kafka.common.protocol.ApiKeys.LIST_OFFSETS;
+import static org.apache.kafka.common.protocol.ApiKeys.OFFSET_FETCH;
 import static org.apache.kafka.common.protocol.ApiKeys.STOP_REPLICA;
 import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP;
 import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
@@ -311,21 +312,30 @@ public class RequestResponseTest {
         checkErrorResponse(createMetadataRequest(3, 
Collections.singletonList("topic1")), unknownServerException, true);
         checkResponse(createMetadataResponse(), 4, true);
         checkErrorResponse(createMetadataRequest(4, 
Collections.singletonList("topic1")), unknownServerException, true);
-        checkRequest(createOffsetFetchRequestForAllPartition("group1", false), 
true);
-        checkRequest(createOffsetFetchRequestForAllPartition("group1", true), 
true);
-        checkErrorResponse(createOffsetFetchRequestForAllPartition("group1", 
false), new NotCoordinatorException("Not Coordinator"), true);
-        checkErrorResponse(createOffsetFetchRequestForAllPartition("group1", 
true), new NotCoordinatorException("Not Coordinator"), true);
         checkRequest(createOffsetFetchRequest(0, false), true);
         checkRequest(createOffsetFetchRequest(1, false), true);
         checkRequest(createOffsetFetchRequest(2, false), true);
         checkRequest(createOffsetFetchRequest(7, true), true);
-        checkRequest(createOffsetFetchRequestForAllPartition("group1", false), 
true);
-        checkRequest(createOffsetFetchRequestForAllPartition("group1", true), 
true);
+        checkRequest(createOffsetFetchRequest(8, true), true);
+        checkRequest(createOffsetFetchRequestWithMultipleGroups(8, true), 
true);
+        checkRequest(createOffsetFetchRequestWithMultipleGroups(8, false), 
true);
+        checkRequest(createOffsetFetchRequestForAllPartition(7, true), true);
+        checkRequest(createOffsetFetchRequestForAllPartition(8, true), true);
         checkErrorResponse(createOffsetFetchRequest(0, false), 
unknownServerException, true);
         checkErrorResponse(createOffsetFetchRequest(1, false), 
unknownServerException, true);
         checkErrorResponse(createOffsetFetchRequest(2, false), 
unknownServerException, true);
         checkErrorResponse(createOffsetFetchRequest(7, true), 
unknownServerException, true);
-        checkResponse(createOffsetFetchResponse(), 0, true);
+        checkErrorResponse(createOffsetFetchRequest(8, true), 
unknownServerException, true);
+        checkErrorResponse(createOffsetFetchRequestWithMultipleGroups(8, 
true), unknownServerException, true);
+        checkErrorResponse(createOffsetFetchRequestForAllPartition(7, true),
+            new NotCoordinatorException("Not Coordinator"), true);
+        checkErrorResponse(createOffsetFetchRequestForAllPartition(8, true),
+            new NotCoordinatorException("Not Coordinator"), true);
+        checkErrorResponse(createOffsetFetchRequestWithMultipleGroups(8, true),
+            new NotCoordinatorException("Not Coordinator"), true);
+        checkResponse(createOffsetFetchResponse(0), 0, true);
+        checkResponse(createOffsetFetchResponse(7), 7, true);
+        checkResponse(createOffsetFetchResponse(8), 8, true);
         checkRequest(createProduceRequest(2), true);
         checkErrorResponse(createProduceRequest(2), unknownServerException, 
true);
         checkRequest(createProduceRequest(3), true);
@@ -1051,18 +1061,51 @@ public class RequestResponseTest {
     }
 
     @Test
-    public void testOffsetFetchRequestBuilderToString() {
+    public void testOffsetFetchRequestBuilderToStringV0ToV7() {
         List<Boolean> stableFlags = Arrays.asList(true, false);
         for (Boolean requireStable : stableFlags) {
-            String allTopicPartitionsString = new 
OffsetFetchRequest.Builder("someGroup", requireStable, null, false).toString();
-
-            assertTrue(allTopicPartitionsString.contains("groupId='someGroup', 
topics=null, requireStable="
-                                                             + 
requireStable.toString()));
+            String allTopicPartitionsString = new 
OffsetFetchRequest.Builder("someGroup",
+                requireStable,
+                null,
+                false)
+                .toString();
+
+            assertTrue(allTopicPartitionsString.contains("groupId='someGroup', 
topics=null,"
+                + " groups=[], requireStable=" + requireStable));
             String string = new OffsetFetchRequest.Builder("group1",
-                requireStable, Collections.singletonList(new 
TopicPartition("test11", 1)), false).toString();
+                requireStable,
+                Collections.singletonList(
+                    new TopicPartition("test11", 1)),
+                false)
+                .toString();
             assertTrue(string.contains("test11"));
             assertTrue(string.contains("group1"));
-            assertTrue(string.contains("requireStable=" + 
requireStable.toString()));
+            assertTrue(string.contains("requireStable=" + requireStable));
+        }
+    }
+
+    @Test
+    public void testOffsetFetchRequestBuilderToStringV8AndAbove() {
+        List<Boolean> stableFlags = Arrays.asList(true, false);
+        for (Boolean requireStable : stableFlags) {
+            String allTopicPartitionsString = new OffsetFetchRequest.Builder(
+                Collections.singletonMap("someGroup", null),
+                requireStable,
+                false)
+                .toString();
+            
assertTrue(allTopicPartitionsString.contains("groups=[OffsetFetchRequestGroup"
+                + "(groupId='someGroup', topics=null)], requireStable=" + 
requireStable));
+
+            String subsetTopicPartitionsString = new 
OffsetFetchRequest.Builder(
+                Collections.singletonMap(
+                    "group1",
+                    Collections.singletonList(new TopicPartition("test11", 
1))),
+                requireStable,
+                false)
+                .toString();
+            assertTrue(subsetTopicPartitionsString.contains("test11"));
+            assertTrue(subsetTopicPartitionsString.contains("group1"));
+            assertTrue(subsetTopicPartitionsString.contains("requireStable=" + 
requireStable));
         }
     }
 
@@ -1603,21 +1646,81 @@ public class RequestResponseTest {
     }
 
     private OffsetFetchRequest createOffsetFetchRequest(int version, boolean 
requireStable) {
-        return new OffsetFetchRequest.Builder("group1", requireStable, 
Collections.singletonList(new TopicPartition("test11", 1)), false)
+        if (version < 8) {
+            return new OffsetFetchRequest.Builder(
+                "group1",
+                requireStable,
+                Collections.singletonList(new TopicPartition("test11", 1)),
+                false)
                 .build((short) version);
+        }
+        return new OffsetFetchRequest.Builder(
+            Collections.singletonMap(
+                "group1",
+                Collections.singletonList(new TopicPartition("test11", 1))),
+            requireStable,
+            false)
+            .build((short) version);
+    }
+
+    private OffsetFetchRequest createOffsetFetchRequestWithMultipleGroups(int 
version,
+        boolean requireStable) {
+        Map<String, List<TopicPartition>> groupToPartitionMap = new 
HashMap<>();
+        List<TopicPartition> topic1 = singletonList(
+            new TopicPartition("topic1", 0));
+        List<TopicPartition> topic2 = Arrays.asList(
+            new TopicPartition("topic1", 0),
+            new TopicPartition("topic2", 0),
+            new TopicPartition("topic2", 1));
+        List<TopicPartition> topic3 = Arrays.asList(
+            new TopicPartition("topic1", 0),
+            new TopicPartition("topic2", 0),
+            new TopicPartition("topic2", 1),
+            new TopicPartition("topic3", 0),
+            new TopicPartition("topic3", 1),
+            new TopicPartition("topic3", 2));
+        groupToPartitionMap.put("group1", topic1);
+        groupToPartitionMap.put("group2", topic2);
+        groupToPartitionMap.put("group3", topic3);
+        groupToPartitionMap.put("group4", null);
+        groupToPartitionMap.put("group5", null);
+
+        return new OffsetFetchRequest.Builder(
+            groupToPartitionMap,
+            requireStable,
+            false
+        ).build((short) version);
     }
 
-    private OffsetFetchRequest createOffsetFetchRequestForAllPartition(String 
groupId, boolean requireStable) {
-        return new OffsetFetchRequest.Builder(groupId, requireStable, null, 
false).build();
+    private OffsetFetchRequest createOffsetFetchRequestForAllPartition(int 
version, boolean requireStable) {
+        if (version < 8) {
+            return new OffsetFetchRequest.Builder(
+                "group1",
+                requireStable,
+                null,
+                false)
+                .build((short) version);
+        }
+        return new OffsetFetchRequest.Builder(
+            Collections.singletonMap(
+                "group1", null),
+            requireStable,
+            false)
+            .build((short) version);
     }
 
-    private OffsetFetchResponse createOffsetFetchResponse() {
+    private OffsetFetchResponse createOffsetFetchResponse(int version) {
         Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = 
new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new 
OffsetFetchResponse.PartitionData(
-                100L, Optional.empty(), "", Errors.NONE));
+            100L, Optional.empty(), "", Errors.NONE));
         responseData.put(new TopicPartition("test", 1), new 
OffsetFetchResponse.PartitionData(
-                100L, Optional.of(10), null, Errors.NONE));
-        return new OffsetFetchResponse(Errors.NONE, responseData);
+            100L, Optional.of(10), null, Errors.NONE));
+        if (version < 8) {
+            return new OffsetFetchResponse(Errors.NONE, responseData);
+        }
+        int throttleMs = 10;
+        return new OffsetFetchResponse(throttleMs, 
Collections.singletonMap("group1", Errors.NONE),
+            Collections.singletonMap("group1", responseData));
     }
 
     @SuppressWarnings("deprecation")
@@ -2864,7 +2967,7 @@ public class RequestResponseTest {
         assertEquals(Integer.valueOf(3), 
createMetadataResponse().errorCounts().get(Errors.NONE));
         assertEquals(Integer.valueOf(1), 
createOffsetCommitResponse().errorCounts().get(Errors.NONE));
         assertEquals(Integer.valueOf(2), 
createOffsetDeleteResponse().errorCounts().get(Errors.NONE));
-        assertEquals(Integer.valueOf(3), 
createOffsetFetchResponse().errorCounts().get(Errors.NONE));
+        assertEquals(Integer.valueOf(3), 
createOffsetFetchResponse(OFFSET_FETCH.latestVersion()).errorCounts().get(Errors.NONE));
         assertEquals(Integer.valueOf(1), 
createProduceResponse().errorCounts().get(Errors.NONE));
         assertEquals(Integer.valueOf(1), 
createRenewTokenResponse().errorCounts().get(Errors.NONE));
         assertEquals(Integer.valueOf(1), 
createSaslAuthenticateResponse().errorCounts().get(Errors.NONE));
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7f7b506..6bf1a0e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -61,6 +61,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
 import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
@@ -415,7 +416,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         new OffsetCommitResponse(requestThrottleMs, 
combinedCommitStatus.asJava))
     }
 
-    // reject the request if not authorized to the group
+      // reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
       val error = Errors.GROUP_AUTHORIZATION_FAILED
       val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
@@ -1254,74 +1255,102 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Handle an offset fetch request
    */
   def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = {
+    val version = request.header.apiVersion
+    if (version == 0) {
+      // reading offsets from ZK
+      handleOffsetFetchRequestV0(request)
+    } else if (version >= 1 && version <= 7) {
+      // reading offsets from Kafka
+      handleOffsetFetchRequestBetweenV1AndV7(request)
+    } else {
+      // batching offset reads for multiple groups starts with version 8 and 
greater
+      handleOffsetFetchRequestV8AndAbove(request)
+    }
+  }
+
+  private def handleOffsetFetchRequestV0(request: RequestChannel.Request): 
Unit = {
     val header = request.header
     val offsetFetchRequest = request.body[OffsetFetchRequest]
 
-    def partitionByAuthorized(seq: Seq[TopicPartition]): (Seq[TopicPartition], 
Seq[TopicPartition]) =
-      authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
seq)(_.topic)
-
     def createResponse(requestThrottleMs: Int): AbstractResponse = {
       val offsetFetchResponse =
-        // reject the request if not authorized to the group
+      // reject the request if not authorized to the group
         if (!authHelper.authorize(request.context, DESCRIBE, GROUP, 
offsetFetchRequest.groupId))
           offsetFetchRequest.getErrorResponse(requestThrottleMs, 
Errors.GROUP_AUTHORIZATION_FAILED)
         else {
-          if (header.apiVersion == 0) {
-            val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset fetch 
requests"))
-            val (authorizedPartitions, unauthorizedPartitions) = 
partitionByAuthorized(
-              offsetFetchRequest.partitions.asScala)
-
-            // version 0 reads offsets from ZK
-            val authorizedPartitionData = authorizedPartitions.map { 
topicPartition =>
-              try {
-                if (!metadataCache.contains(topicPartition))
-                  (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
-                else {
-                  val payloadOpt = 
zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
-                  payloadOpt match {
-                    case Some(payload) =>
-                      (topicPartition, new 
OffsetFetchResponse.PartitionData(payload.toLong,
-                        Optional.empty(), OffsetFetchResponse.NO_METADATA, 
Errors.NONE))
-                    case None =>
-                      (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
-                  }
-                }
-              } catch {
-                case e: Throwable =>
-                  (topicPartition, new 
OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
-                    Optional.empty(), OffsetFetchResponse.NO_METADATA, 
Errors.forException(e)))
-              }
-            }.toMap
+          val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset fetch 
requests"))
+          val (authorizedPartitions, unauthorizedPartitions) = 
partitionByAuthorized(
+            offsetFetchRequest.partitions.asScala, request.context)
 
-            val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> 
OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
-            new OffsetFetchResponse(requestThrottleMs, Errors.NONE, 
(authorizedPartitionData ++ unauthorizedPartitionData).asJava)
-          } else {
-            // versions 1 and above read offsets from Kafka
-            if (offsetFetchRequest.isAllPartitions) {
-              val (error, allPartitionData) = 
groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId, 
offsetFetchRequest.requireStable)
-              if (error != Errors.NONE)
-                offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
-              else {
-                // clients are not allowed to see offsets for topics that are 
not authorized for Describe
-                val (authorizedPartitionData, _) = 
authHelper.partitionMapByAuthorized(request.context,
-                  DESCRIBE, TOPIC, allPartitionData)(_.topic)
-                new OffsetFetchResponse(requestThrottleMs, Errors.NONE, 
authorizedPartitionData.asJava)
-              }
-            } else {
-              val (authorizedPartitions, unauthorizedPartitions) = 
partitionByAuthorized(
-                offsetFetchRequest.partitions.asScala)
-              val (error, authorizedPartitionData) = 
groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
-                offsetFetchRequest.requireStable, Some(authorizedPartitions))
-              if (error != Errors.NONE)
-                offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
+          // version 0 reads offsets from ZK
+          val authorizedPartitionData = authorizedPartitions.map { 
topicPartition =>
+            try {
+              if (!metadataCache.contains(topicPartition))
+                (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
               else {
-                val unauthorizedPartitionData = unauthorizedPartitions.map(_ 
-> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
-                new OffsetFetchResponse(requestThrottleMs, Errors.NONE, 
(authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+                val payloadOpt = 
zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
+                payloadOpt match {
+                  case Some(payload) =>
+                    (topicPartition, new 
OffsetFetchResponse.PartitionData(payload.toLong,
+                      Optional.empty(), OffsetFetchResponse.NO_METADATA, 
Errors.NONE))
+                  case None =>
+                    (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+                }
               }
+            } catch {
+              case e: Throwable =>
+                (topicPartition, new 
OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+                  Optional.empty(), OffsetFetchResponse.NO_METADATA, 
Errors.forException(e)))
             }
-          }
+          }.toMap
+
+          val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> 
OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
+          new OffsetFetchResponse(requestThrottleMs, Errors.NONE, 
(authorizedPartitionData ++ unauthorizedPartitionData).asJava)
         }
+      trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
+      offsetFetchResponse
+    }
+    requestHelper.sendResponseMaybeThrottle(request, createResponse)
+  }
 
+  private def handleOffsetFetchRequestBetweenV1AndV7(request: 
RequestChannel.Request): Unit = {
+    val header = request.header
+    val offsetFetchRequest = request.body[OffsetFetchRequest]
+    val groupId = offsetFetchRequest.groupId()
+    val (error, partitionData) = fetchOffsets(groupId, 
offsetFetchRequest.isAllPartitions,
+      offsetFetchRequest.requireStable, offsetFetchRequest.partitions, 
request.context)
+    def createResponse(requestThrottleMs: Int): AbstractResponse = {
+      val offsetFetchResponse =
+        if (error != Errors.NONE) {
+          offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
+        } else {
+          new OffsetFetchResponse(requestThrottleMs, Errors.NONE, 
partitionData.asJava)
+        }
+      trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
+      offsetFetchResponse
+    }
+    requestHelper.sendResponseMaybeThrottle(request, createResponse)
+  }
+
+  private def handleOffsetFetchRequestV8AndAbove(request: 
RequestChannel.Request): Unit = {
+    val header = request.header
+    val offsetFetchRequest = request.body[OffsetFetchRequest]
+    val groupIds = offsetFetchRequest.groupIds().asScala
+    val groupToErrorMap =  mutable.Map.empty[String, Errors]
+    val groupToPartitionData =  mutable.Map.empty[String, 
util.Map[TopicPartition, PartitionData]]
+    val groupToTopicPartitions = offsetFetchRequest.groupIdsToPartitions()
+    groupIds.foreach(g => {
+      val (error, partitionData) = fetchOffsets(g,
+        offsetFetchRequest.isAllPartitionsForGroup(g),
+        offsetFetchRequest.requireStable(),
+        groupToTopicPartitions.get(g), request.context)
+      groupToErrorMap += (g -> error)
+      groupToPartitionData += (g -> partitionData.asJava)
+    })
+
+    def createResponse(requestThrottleMs: Int): AbstractResponse = {
+      val offsetFetchResponse = new OffsetFetchResponse(requestThrottleMs,
+        groupToErrorMap.asJava, groupToPartitionData.asJava)
       trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
       offsetFetchResponse
     }
@@ -1329,6 +1358,40 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestHelper.sendResponseMaybeThrottle(request, createResponse)
   }
 
+  private def fetchOffsets(groupId: String, isAllPartitions: Boolean, 
requireStable: Boolean,
+                           partitions: util.List[TopicPartition], context: 
RequestContext): (Errors, Map[TopicPartition, 
OffsetFetchResponse.PartitionData]) = {
+    if (!authHelper.authorize(context, DESCRIBE, GROUP, groupId)) {
+      (Errors.GROUP_AUTHORIZATION_FAILED, Map.empty)
+    } else {
+      if (isAllPartitions) {
+        val (error, allPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable)
+        if (error != Errors.NONE) {
+          (error, allPartitionData)
+        } else {
+          // clients are not allowed to see offsets for topics that are not 
authorized for Describe
+          val (authorizedPartitionData, _) = 
authHelper.partitionMapByAuthorized(context,
+            DESCRIBE, TOPIC, allPartitionData)(_.topic)
+          (Errors.NONE, authorizedPartitionData)
+        }
+      } else {
+        val (authorizedPartitions, unauthorizedPartitions) = 
partitionByAuthorized(
+          partitions.asScala, context)
+        val (error, authorizedPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId,
+          requireStable, Some(authorizedPartitions))
+        if (error != Errors.NONE) {
+          (error, authorizedPartitionData)
+        } else {
+          val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> 
OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
+          (Errors.NONE, authorizedPartitionData ++ unauthorizedPartitionData)
+        }
+      }
+    }
+  }
+
+  private def partitionByAuthorized(seq: Seq[TopicPartition], context: 
RequestContext):
+  (Seq[TopicPartition], Seq[TopicPartition]) =
+    authHelper.partitionSeqByAuthorized(context, DESCRIBE, TOPIC, seq)(_.topic)
+
   def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
     if (version < 4) {
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index bbdb94b..fd5f12c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartiti
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
RecordBatch, SimpleRecord}
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.apache.kafka.common.resource.ResourceType._
@@ -62,6 +63,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Collections.singletonList
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
@@ -175,7 +177,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     }),
     ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => 
Errors.forCode(
       resp.data.topics().get(0).partitions().get(0).errorCode)),
-    ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => 
resp.error),
+    ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => 
resp.groupLevelError(group)),
     ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => {
       Errors.forCode(resp.data.coordinators.asScala.find(g => group == 
g.key).head.errorCode)
     }),
@@ -378,10 +380,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     OffsetsForLeaderEpochRequest.Builder.forConsumer(epochs).build()
   }
 
-  private def createOffsetFetchRequest = {
+  private def createOffsetFetchRequest: OffsetFetchRequest = {
     new requests.OffsetFetchRequest.Builder(group, false, List(tp).asJava, 
false).build()
   }
 
+  private def createOffsetFetchRequestAllPartitions: OffsetFetchRequest = {
+    new requests.OffsetFetchRequest.Builder(group, false, null, false).build()
+  }
+
+  private def createOffsetFetchRequest(groupToPartitionMap: util.Map[String, 
util.List[TopicPartition]]): OffsetFetchRequest = {
+    new requests.OffsetFetchRequest.Builder(groupToPartitionMap, false, 
false).build()
+  }
+
   private def createFindCoordinatorRequest = {
     new FindCoordinatorRequest.Builder(
         new FindCoordinatorRequestData()
@@ -1341,7 +1351,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
-  def testFetchAllOffsetsTopicAuthorization(): Unit = {
+  def testOffsetFetchAllTopicPartitionsAuthorization(): Unit = {
     createTopic(topic)
 
     val offset = 15L
@@ -1358,17 +1368,204 @@ class AuthorizerIntegrationTest extends 
BaseRequestTest {
     // note there's only one broker, so no need to lookup the group coordinator
 
     // without describe permission on the topic, we shouldn't be able to fetch 
offsets
-    val offsetFetchRequest = new requests.OffsetFetchRequest.Builder(group, 
false, null, false).build()
+    val offsetFetchRequest = createOffsetFetchRequestAllPartitions
     var offsetFetchResponse = 
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
-    assertEquals(Errors.NONE, offsetFetchResponse.error)
-    assertTrue(offsetFetchResponse.responseData.isEmpty)
+    assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group))
+    assertTrue(offsetFetchResponse.partitionDataMap(group).isEmpty)
 
     // now add describe permission on the topic and verify that the offset can 
be fetched
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, DESCRIBE, ALLOW)), topicResource)
     offsetFetchResponse = 
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
-    assertEquals(Errors.NONE, offsetFetchResponse.error)
-    assertTrue(offsetFetchResponse.responseData.containsKey(tp))
-    assertEquals(offset, offsetFetchResponse.responseData.get(tp).offset)
+    assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group))
+    assertTrue(offsetFetchResponse.partitionDataMap(group).containsKey(tp))
+    assertEquals(offset, 
offsetFetchResponse.partitionDataMap(group).get(tp).offset)
+  }
+
+  @Test
+  def testOffsetFetchMultipleGroupsAuthorization(): Unit = {
+    val groups: Seq[String] = (1 to 5).map(i => s"group$i")
+    val groupResources = groups.map(group => new ResourcePattern(GROUP, group, 
LITERAL))
+    val topics: Seq[String] = (1 to 3).map(i => s"topic$i")
+    val topicResources = topics.map(topic => new ResourcePattern(TOPIC, topic, 
LITERAL))
+
+    val topic1List = singletonList(new TopicPartition(topics(0), 0))
+    val topic1And2List = util.Arrays.asList(
+      new TopicPartition(topics(0), 0),
+      new TopicPartition(topics(1), 0),
+      new TopicPartition(topics(1), 1))
+    val allTopicsList = util.Arrays.asList(
+      new TopicPartition(topics(0), 0),
+      new TopicPartition(topics(1), 0),
+      new TopicPartition(topics(1), 1),
+      new TopicPartition(topics(2), 0),
+      new TopicPartition(topics(2), 1),
+      new TopicPartition(topics(2), 2))
+
+    // create group to partition map to build batched offsetFetch request
+    val groupToPartitionMap = new util.HashMap[String, 
util.List[TopicPartition]]()
+    groupToPartitionMap.put(groups(0), topic1List)
+    groupToPartitionMap.put(groups(1), topic1And2List)
+    groupToPartitionMap.put(groups(2), allTopicsList)
+    groupToPartitionMap.put(groups(3), null)
+    groupToPartitionMap.put(groups(4), null)
+
+    createTopic(topics(0))
+    createTopic(topics(1), numPartitions = 2)
+    createTopic(topics(2), numPartitions = 3)
+    groupResources.foreach(r => {
+      addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, READ, ALLOW)), r)
+    })
+    topicResources.foreach(t => {
+      addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, READ, ALLOW)), t)
+    })
+
+    val offset = 15L
+    val leaderEpoch: Optional[Integer] = Optional.of(1)
+    val metadata = "metadata"
+    val topicOneOffsets = topic1List.asScala.map {
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
+    val topicOneAndTwoOffsets = topic1And2List.asScala.map {
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
+    val allTopicOffsets = allTopicsList.asScala.map {
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
+
+    // create 5 consumers to commit offsets so we can fetch them later
+
+    def commitOffsets(tpList: util.List[TopicPartition],
+                      offsets: util.Map[TopicPartition, OffsetAndMetadata]): 
Unit = {
+      val consumer = createConsumer()
+      consumer.assign(tpList)
+      consumer.commitSync(offsets)
+      consumer.close()
+    }
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(0))
+    commitOffsets(topic1List, topicOneOffsets)
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(1))
+    commitOffsets(topic1And2List, topicOneAndTwoOffsets)
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(2))
+    commitOffsets(allTopicsList, allTopicOffsets)
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(3))
+    commitOffsets(allTopicsList, allTopicOffsets)
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(4))
+    commitOffsets(allTopicsList, allTopicOffsets)
+
+    removeAllClientAcls()
+
+    def verifyPartitionData(partitionData: OffsetFetchResponse.PartitionData): 
Unit = {
+      assertTrue(!partitionData.hasError)
+      assertEquals(offset, partitionData.offset)
+      assertEquals(metadata, partitionData.metadata)
+      assertEquals(leaderEpoch.get(), partitionData.leaderEpoch.get())
+    }
+
+    def verifyResponse(groupLevelResponse: Errors,
+                       partitionData: util.Map[TopicPartition, PartitionData],
+                       topicList: util.List[TopicPartition]): Unit = {
+      assertEquals(Errors.NONE, groupLevelResponse)
+      assertTrue(partitionData.size() == topicList.size())
+      topicList.forEach(t => verifyPartitionData(partitionData.get(t)))
+    }
+
+    // test handling partial errors, where one group is fully authorized, some 
groups don't have
+    // the right topic authorizations, and some groups have no authorization
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, READ, ALLOW)), groupResources(0))
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, READ, ALLOW)), groupResources(1))
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, READ, ALLOW)), groupResources(3))
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, DESCRIBE, ALLOW)), topicResources(0))
+    val offsetFetchRequest = createOffsetFetchRequest(groupToPartitionMap)
+    var offsetFetchResponse = 
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
+    offsetFetchResponse.data().groups().forEach(g =>
+      g.groupId() match {
+        case "group1" =>
+          verifyResponse(offsetFetchResponse.groupLevelError(groups(0)), 
offsetFetchResponse
+            .partitionDataMap(groups(0)), topic1List)
+        case "group2" =>
+          assertEquals(Errors.NONE, 
offsetFetchResponse.groupLevelError(groups(1)))
+          val group2Response = offsetFetchResponse.partitionDataMap(groups(1))
+          assertTrue(group2Response.size() == 3)
+          assertTrue(group2Response.keySet().containsAll(topic1And2List))
+          verifyPartitionData(group2Response.get(topic1And2List.get(0)))
+          assertTrue(group2Response.get(topic1And2List.get(1)).hasError)
+          assertTrue(group2Response.get(topic1And2List.get(2)).hasError)
+          assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION, 
group2Response.get(topic1And2List.get(1)))
+          assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION, 
group2Response.get(topic1And2List.get(2)))
+        case "group3" =>
+          assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, 
offsetFetchResponse.groupLevelError(groups(2)))
+          assertTrue(offsetFetchResponse.partitionDataMap(groups(2)).size() == 
0)
+        case "group4" =>
+          verifyResponse(offsetFetchResponse.groupLevelError(groups(3)), 
offsetFetchResponse
+            .partitionDataMap(groups(3)), topic1List)
+        case "group5" =>
+          assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, 
offsetFetchResponse.groupLevelError(groups(4)))
+          assertTrue(offsetFetchResponse.partitionDataMap(groups(4)).size() == 
0)
+      })
+
+    // test that after adding some of the ACLs, we get no group level 
authorization errors, but
+    // still get topic level authorization errors for topics we don't have 
ACLs for
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, READ, ALLOW)), groupResources(2))
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, READ, ALLOW)), groupResources(4))
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, DESCRIBE, ALLOW)), topicResources(1))
+    offsetFetchResponse = 
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
+    offsetFetchResponse.data().groups().forEach(g =>
+      g.groupId() match {
+        case "group1" =>
+          verifyResponse(offsetFetchResponse.groupLevelError(groups(0)), 
offsetFetchResponse
+            .partitionDataMap(groups(0)), topic1List)
+        case "group2" =>
+          verifyResponse(offsetFetchResponse.groupLevelError(groups(1)), 
offsetFetchResponse
+            .partitionDataMap(groups(1)), topic1And2List)
+        case "group3" =>
+          assertEquals(Errors.NONE, 
offsetFetchResponse.groupLevelError(groups(2)))
+          val group3Response = offsetFetchResponse.partitionDataMap(groups(2))
+          assertTrue(group3Response.size() == 6)
+          assertTrue(group3Response.keySet().containsAll(allTopicsList))
+          verifyPartitionData(group3Response.get(allTopicsList.get(0)))
+          verifyPartitionData(group3Response.get(allTopicsList.get(1)))
+          verifyPartitionData(group3Response.get(allTopicsList.get(2)))
+          assertTrue(group3Response.get(allTopicsList.get(3)).hasError)
+          assertTrue(group3Response.get(allTopicsList.get(4)).hasError)
+          assertTrue(group3Response.get(allTopicsList.get(5)).hasError)
+          assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION, 
group3Response.get(allTopicsList.get(3)))
+          assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION, 
group3Response.get(allTopicsList.get(4)))
+          assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION, 
group3Response.get(allTopicsList.get(5)))
+        case "group4" =>
+          verifyResponse(offsetFetchResponse.groupLevelError(groups(3)), 
offsetFetchResponse
+            .partitionDataMap(groups(3)), topic1And2List)
+        case "group5" =>
+          verifyResponse(offsetFetchResponse.groupLevelError(groups(4)), 
offsetFetchResponse
+            .partitionDataMap(groups(4)), topic1And2List)
+      })
+
+    // test that after adding all necessary ACLs, we get no partition level or 
group level errors
+    // from the offsetFetch response
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, DESCRIBE, ALLOW)), topicResources(2))
+    offsetFetchResponse = 
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
+    offsetFetchResponse.data().groups().forEach(g =>
+      g.groupId() match {
+        case "group1" =>
+          verifyResponse(offsetFetchResponse.groupLevelError(groups(0)), 
offsetFetchResponse
+            .partitionDataMap(groups(0)), topic1List)
+        case "group2" =>
+          verifyResponse(offsetFetchResponse.groupLevelError(groups(1)), 
offsetFetchResponse
+            .partitionDataMap(groups(1)), topic1And2List)
+        case "group3" =>
+          verifyResponse(offsetFetchResponse.groupLevelError(groups(2)), 
offsetFetchResponse
+            .partitionDataMap(groups(2)), allTopicsList)
+        case "group4" =>
+          verifyResponse(offsetFetchResponse.groupLevelError(groups(3)), 
offsetFetchResponse
+            .partitionDataMap(groups(3)), allTopicsList)
+        case "group5" =>
+          verifyResponse(offsetFetchResponse.groupLevelError(groups(4)), 
offsetFetchResponse
+            .partitionDataMap(groups(4)), allTopicsList)
+      })
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
new file mode 100644
index 0000000..ea5064b
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
+import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, 
OffsetFetchResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+import java.util
+import java.util.Collections.singletonList
+import scala.jdk.CollectionConverters._
+import java.util.{Optional, Properties}
+
+class OffsetFetchRequestTest extends BaseRequestTest {
+
+  override def brokerCount: Int = 1
+
+  val brokerId: Integer = 0
+  val offset = 15L
+  val leaderEpoch: Optional[Integer] = Optional.of(3)
+  val metadata = "metadata"
+  val topic = "topic"
+  val groupId = "groupId"
+  val groups: Seq[String] = (1 to 5).map(i => s"group$i")
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+    properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+    properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
+    properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
+    properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
+    properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
+  }
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    doSetup(createOffsetsTopic = false)
+
+    TestUtils.createOffsetsTopic(zkClient, servers)
+  }
+
+  @Test
+  def testOffsetFetchRequestSingleGroup(): Unit = {
+    createTopic(topic)
+
+    val tpList = singletonList(new TopicPartition(topic, 0))
+    val topicOffsets = tpList.asScala.map{
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+    commitOffsets(tpList, topicOffsets)
+
+    // testing from version 1 onward since version 0 read offsets from ZK
+    for (version <- 1 to ApiKeys.OFFSET_FETCH.latestVersion()) {
+      if (version < 8) {
+        val request =
+          if (version < 7) {
+            new OffsetFetchRequest.Builder(
+              groupId, false, tpList, false)
+              .build(version.asInstanceOf[Short])
+          } else {
+            new OffsetFetchRequest.Builder(
+              groupId, false, tpList, true)
+              .build(version.asInstanceOf[Short])
+          }
+        val response = connectAndReceive[OffsetFetchResponse](request)
+        val topicData = response.data().topics().get(0)
+        val partitionData = topicData.partitions().get(0)
+        if (version < 3) {
+          assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, 
response.throttleTimeMs())
+        }
+        verifySingleGroupResponse(version.asInstanceOf[Short],
+          response.error().code(), partitionData.errorCode(), topicData.name(),
+          partitionData.partitionIndex(), partitionData.committedOffset(),
+          partitionData.committedLeaderEpoch(), partitionData.metadata())
+      } else {
+        val request = new OffsetFetchRequest.Builder(
+          Map(groupId -> tpList).asJava, false, false)
+          .build(version.asInstanceOf[Short])
+        val response = connectAndReceive[OffsetFetchResponse](request)
+        val groupData = response.data().groups().get(0)
+        val topicData = groupData.topics().get(0)
+        val partitionData = topicData.partitions().get(0)
+        verifySingleGroupResponse(version.asInstanceOf[Short],
+          groupData.errorCode(), partitionData.errorCode(), topicData.name(),
+          partitionData.partitionIndex(), partitionData.committedOffset(),
+          partitionData.committedLeaderEpoch(), partitionData.metadata())
+      }
+    }
+  }
+
+  @Test
+  def testOffsetFetchRequestWithMultipleGroups(): Unit = {
+
+    val topic1 = "topic1"
+    val topic1List = singletonList(new TopicPartition(topic1, 0))
+    val topic2 = "topic2"
+    val topic1And2List = util.Arrays.asList(
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic2, 0),
+      new TopicPartition(topic2, 1))
+    val topic3 = "topic3"
+    val allTopicsList = util.Arrays.asList(
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic2, 0),
+      new TopicPartition(topic2, 1),
+      new TopicPartition(topic3, 0),
+      new TopicPartition(topic3, 1),
+      new TopicPartition(topic3, 2))
+
+    // create group to partition map to build batched offsetFetch request
+    val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
+      new util.HashMap[String, util.List[TopicPartition]]()
+    groupToPartitionMap.put(groups(0), topic1List)
+    groupToPartitionMap.put(groups(1), topic1And2List)
+    groupToPartitionMap.put(groups(2), allTopicsList)
+    groupToPartitionMap.put(groups(3), null)
+    groupToPartitionMap.put(groups(4), null)
+
+    createTopic(topic1)
+    createTopic(topic2, numPartitions = 2)
+    createTopic(topic3, numPartitions = 3)
+
+    val topicOneOffsets = topic1List.asScala.map{
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
+    val topicOneAndTwoOffsets = topic1And2List.asScala.map{
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
+    val allTopicOffsets = allTopicsList.asScala.map{
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
+
+    // create 5 consumers to commit offsets so we can fetch them later
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(0))
+    commitOffsets(topic1List, topicOneOffsets)
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(1))
+    commitOffsets(topic1And2List, topicOneAndTwoOffsets)
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(2))
+    commitOffsets(allTopicsList, allTopicOffsets)
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(3))
+    commitOffsets(allTopicsList, allTopicOffsets)
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(4))
+    commitOffsets(allTopicsList, allTopicOffsets)
+
+    for (version <- 8 to ApiKeys.OFFSET_FETCH.latestVersion()) {
+      val request =  new OffsetFetchRequest.Builder(groupToPartitionMap, 
false, false)
+        .build(version.asInstanceOf[Short])
+      val response = connectAndReceive[OffsetFetchResponse](request)
+      response.data().groups().forEach(g =>
+        g.groupId() match {
+          case "group1" =>
+            verifyResponse(response.groupLevelError(groups(0)),
+              response.partitionDataMap(groups(0)), topic1List)
+          case "group2" =>
+            verifyResponse(response.groupLevelError(groups(1)),
+              response.partitionDataMap(groups(1)), topic1And2List)
+          case "group3" =>
+            verifyResponse(response.groupLevelError(groups(2)),
+              response.partitionDataMap(groups(2)), allTopicsList)
+          case "group4" =>
+            verifyResponse(response.groupLevelError(groups(3)),
+              response.partitionDataMap(groups(3)), allTopicsList)
+          case "group5" =>
+            verifyResponse(response.groupLevelError(groups(4)),
+              response.partitionDataMap(groups(4)), allTopicsList)
+        })
+    }
+  }
+
+  private def verifySingleGroupResponse(version: Short,
+                                        responseError: Short,
+                                        partitionError: Short,
+                                        topicName: String,
+                                        partitionIndex: Integer,
+                                        committedOffset: Long,
+                                        committedLeaderEpoch: Integer,
+                                        partitionMetadata: String): Unit = {
+    assertEquals(Errors.NONE.code(), responseError)
+    assertEquals(topic, topicName)
+    assertEquals(0, partitionIndex)
+    assertEquals(offset, committedOffset)
+    if (version >= 5) {
+      assertEquals(leaderEpoch.get(), committedLeaderEpoch)
+    }
+    assertEquals(metadata, partitionMetadata)
+    assertEquals(Errors.NONE.code(), partitionError)
+  }
+
+  private def verifyPartitionData(partitionData: 
OffsetFetchResponse.PartitionData): Unit = {
+    assertTrue(!partitionData.hasError)
+    assertEquals(offset, partitionData.offset)
+    assertEquals(metadata, partitionData.metadata)
+    assertEquals(leaderEpoch.get(), partitionData.leaderEpoch.get())
+  }
+
+  private def verifyResponse(groupLevelResponse: Errors,
+                             partitionData: util.Map[TopicPartition, 
PartitionData],
+                             topicList: util.List[TopicPartition]): Unit = {
+    assertEquals(Errors.NONE, groupLevelResponse)
+    assertTrue(partitionData.size() == topicList.size())
+    topicList.forEach(t => verifyPartitionData(partitionData.get(t)))
+  }
+
+  private def commitOffsets(tpList: util.List[TopicPartition],
+                            offsets: util.Map[TopicPartition, 
OffsetAndMetadata]): Unit = {
+    val consumer = createConsumer()
+    consumer.assign(tpList)
+    consumer.commitSync(offsets)
+    consumer.close()
+  }
+}

Reply via email to