This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new be194f5dbae MINOR: Simplify OffsetFetchRequest (#19572)
be194f5dbae is described below

commit be194f5dbaeabf7c8992312fd0b8bcf596a1284a
Author: David Jacot <david.ja...@gmail.com>
AuthorDate: Sun Apr 27 18:58:30 2025 +0200

    MINOR: Simplify OffsetFetchRequest (#19572)
    
    While working on https://github.com/apache/kafka/pull/19515, I came to
    the conclusion that the OffsetFetchRequest is quite messy and overall
    too complicated. This patch rationalize the constructors.
    OffsetFetchRequest has a single constructor accepting the
    OffsetFetchRequestData. This will also simplify adding the topic ids.
    All the changes are mechanical, replacing data structures by others.
    
    Reviewers: PoAn Yang <pay...@apache.org>, TengYao Chi 
<frankvi...@apache.org>, Lianet Magran <lmagr...@confluent.io>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 .../internals/ListConsumerGroupOffsetsHandler.java |  36 +-
 .../consumer/internals/CommitRequestManager.java   |  39 +-
 .../consumer/internals/ConsumerCoordinator.java    |  23 +-
 .../kafka/common/requests/OffsetFetchRequest.java  | 215 +++--------
 .../kafka/clients/admin/KafkaAdminClientTest.java  |   6 +-
 .../common/requests/OffsetFetchRequestTest.java    | 395 +++++++++++----------
 .../kafka/common/requests/RequestResponseTest.java | 167 ++++-----
 .../kafka/api/AuthorizerIntegrationTest.scala      |  46 ++-
 .../server/GroupCoordinatorBaseRequestTest.scala   |  42 ++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 154 +++++---
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  14 +-
 11 files changed, 607 insertions(+), 530 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
index 4c0e3db9254..36ff25ebb79 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
@@ -20,6 +20,7 @@ import 
org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
@@ -86,15 +87,32 @@ public class ListConsumerGroupOffsetsHandler implements 
AdminApiHandler<Coordina
     }
 
     public OffsetFetchRequest.Builder buildBatchedRequest(Set<CoordinatorKey> 
groupIds) {
-        // Create a map that only contains the consumer groups owned by the 
coordinator.
-        Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions 
= new HashMap<>(groupIds.size());
-        groupIds.forEach(g -> {
-            ListConsumerGroupOffsetsSpec spec = groupSpecs.get(g.idValue);
-            List<TopicPartition> partitions = spec.topicPartitions() != null ? 
new ArrayList<>(spec.topicPartitions()) : null;
-            coordinatorGroupIdToTopicPartitions.put(g.idValue, partitions);
-        });
-
-        return new 
OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, requireStable, 
false);
+        // Create a request that only contains the consumer groups owned by 
the coordinator.
+        return new OffsetFetchRequest.Builder(
+            new OffsetFetchRequestData()
+                .setRequireStable(requireStable)
+                .setGroups(groupIds.stream().map(groupId -> {
+                    ListConsumerGroupOffsetsSpec spec = 
groupSpecs.get(groupId.idValue);
+
+                    List<OffsetFetchRequestData.OffsetFetchRequestTopics> 
topics = null;
+                    if (spec.topicPartitions() != null) {
+                        topics = spec.topicPartitions().stream()
+                            
.collect(Collectors.groupingBy(TopicPartition::topic))
+                            .entrySet()
+                            .stream()
+                            .map(entry -> new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                                .setName(entry.getKey())
+                                .setPartitionIndexes(entry.getValue().stream()
+                                    .map(TopicPartition::partition)
+                                    .collect(Collectors.toList())))
+                            .collect(Collectors.toList());
+                    }
+                    return new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                        .setGroupId(groupId.idValue)
+                        .setTopics(topics);
+                }).collect(Collectors.toList())),
+            false
+        );
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 62d1fe3a866..4b22a5711b3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -34,6 +34,7 @@ import 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnstableOffsetCommitException;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordBatch;
@@ -970,21 +971,37 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         }
 
         public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
+            List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = 
requestedPartitions.stream()
+                .collect(Collectors.groupingBy(TopicPartition::topic))
+                .entrySet()
+                .stream()
+                .map(entry -> new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(entry.getKey())
+                    .setPartitionIndexes(entry.getValue().stream()
+                        .map(TopicPartition::partition)
+                        .collect(Collectors.toList())))
+                .collect(Collectors.toList());
 
-            OffsetFetchRequest.Builder builder = memberInfo.memberEpoch.
-                map(epoch -> new OffsetFetchRequest.Builder(
-                    groupId,
-                    memberInfo.memberId,
-                    epoch,
-                    true,
-                    new ArrayList<>(this.requestedPartitions),
-                    throwOnFetchStableOffsetUnsupported))
+            OffsetFetchRequest.Builder builder = memberInfo.memberEpoch
+                .map(epoch -> new OffsetFetchRequest.Builder(
+                    new OffsetFetchRequestData()
+                        .setRequireStable(true)
+                        .setGroups(List.of(
+                            new 
OffsetFetchRequestData.OffsetFetchRequestGroup()
+                                .setGroupId(groupId)
+                                .setMemberId(memberInfo.memberId)
+                                .setMemberEpoch(epoch)
+                                .setTopics(topics))),
+                            throwOnFetchStableOffsetUnsupported))
                 // Building request without passing member ID/epoch to leave 
the logic to choose
                 // default values when not present on the request builder.
                 .orElseGet(() -> new OffsetFetchRequest.Builder(
-                    groupId,
-                    true,
-                    new ArrayList<>(this.requestedPartitions),
+                    new OffsetFetchRequestData()
+                        .setRequireStable(true)
+                        .setGroups(List.of(
+                            new 
OffsetFetchRequestData.OffsetFetchRequestGroup()
+                                .setGroupId(groupId)
+                                .setTopics(topics))),
                     throwOnFetchStableOffsetUnsupported));
             return buildRequestWithResponseHandling(builder);
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 1cba10ef15d..8829654c7a8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -49,6 +49,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -1477,9 +1478,27 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             return RequestFuture.coordinatorNotAvailable();
 
         log.debug("Fetching committed offsets for partitions: {}", partitions);
+
         // construct the request
-        OffsetFetchRequest.Builder requestBuilder =
-            new OffsetFetchRequest.Builder(this.rebalanceConfig.groupId, true, 
new ArrayList<>(partitions), throwOnFetchStableOffsetsUnsupported);
+        List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = 
partitions.stream()
+            .collect(Collectors.groupingBy(TopicPartition::topic))
+            .entrySet()
+            .stream()
+            .map(entry -> new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName(entry.getKey())
+                .setPartitionIndexes(entry.getValue().stream()
+                    .map(TopicPartition::partition)
+                    .collect(Collectors.toList())))
+            .collect(Collectors.toList());
+
+        OffsetFetchRequest.Builder requestBuilder = new 
OffsetFetchRequest.Builder(
+            new OffsetFetchRequestData()
+                .setRequireStable(true)
+                .setGroups(List.of(
+                    new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                        .setGroupId(this.rebalanceConfig.groupId)
+                        .setTopics(topics))),
+            throwOnFetchStableOffsetsUnsupported);
 
         // send the request with a callback
         return client.send(coordinator, requestBuilder)
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 907cba953fb..2a26e7940d3 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -22,9 +22,11 @@ import 
org.apache.kafka.common.message.OffsetFetchRequestData;
 import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
 import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopic;
 import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.RecordBatch;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,113 +36,35 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class OffsetFetchRequest extends AbstractRequest {
 
     private static final Logger log = 
LoggerFactory.getLogger(OffsetFetchRequest.class);
+    private static final short TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION = 2;
+    private static final short REQUIRE_STABLE_OFFSET_MIN_VERSION = 7;
+    private static final short BATCH_MIN_VERSION = 8;
 
-    private static final List<OffsetFetchRequestTopic> ALL_TOPIC_PARTITIONS = 
null;
-    private static final List<OffsetFetchRequestTopics> 
ALL_TOPIC_PARTITIONS_BATCH = null;
     private final OffsetFetchRequestData data;
 
     public static class Builder extends 
AbstractRequest.Builder<OffsetFetchRequest> {
 
-        public final OffsetFetchRequestData data;
+        private final OffsetFetchRequestData data;
         private final boolean throwOnFetchStableOffsetsUnsupported;
 
-        public Builder(String groupId,
-                       boolean requireStable,
-                       List<TopicPartition> partitions,
-                       boolean throwOnFetchStableOffsetsUnsupported) {
-            this(
-                groupId,
-                null,
-                -1,
-                requireStable,
-                partitions,
-                throwOnFetchStableOffsetsUnsupported
-            );
-        }
-
-        public Builder(String groupId,
-                       String memberId,
-                       int memberEpoch,
-                       boolean requireStable,
-                       List<TopicPartition> partitions,
-                       boolean throwOnFetchStableOffsetsUnsupported) {
+        public Builder(OffsetFetchRequestData data, boolean 
throwOnFetchStableOffsetsUnsupported) {
             super(ApiKeys.OFFSET_FETCH);
-
-            OffsetFetchRequestData.OffsetFetchRequestGroup group =
-                new OffsetFetchRequestData.OffsetFetchRequestGroup()
-                    .setGroupId(groupId)
-                    .setMemberId(memberId)
-                    .setMemberEpoch(memberEpoch);
-
-            if (partitions != null) {
-                Map<String, OffsetFetchRequestTopics> 
offsetFetchRequestTopicMap = new HashMap<>();
-                for (TopicPartition topicPartition : partitions) {
-                    String topicName = topicPartition.topic();
-                    OffsetFetchRequestTopics topic = 
offsetFetchRequestTopicMap.getOrDefault(
-                        topicName, new 
OffsetFetchRequestTopics().setName(topicName));
-                    topic.partitionIndexes().add(topicPartition.partition());
-                    offsetFetchRequestTopicMap.put(topicName, topic);
-                }
-                group.setTopics(new 
ArrayList<>(offsetFetchRequestTopicMap.values()));
-            } else {
-                // If passed in partition list is null, it is requesting 
offsets for all topic partitions.
-                group.setTopics(ALL_TOPIC_PARTITIONS_BATCH);
-            }
-
-            this.data = new OffsetFetchRequestData()
-                .setRequireStable(requireStable)
-                .setGroups(Collections.singletonList(group));
-            this.throwOnFetchStableOffsetsUnsupported = 
throwOnFetchStableOffsetsUnsupported;
-        }
-
-        public Builder(Map<String, List<TopicPartition>> 
groupIdToTopicPartitionMap,
-                       boolean requireStable,
-                       boolean throwOnFetchStableOffsetsUnsupported) {
-            super(ApiKeys.OFFSET_FETCH);
-
-            List<OffsetFetchRequestGroup> groups = new ArrayList<>();
-            for (Entry<String, List<TopicPartition>> entry : 
groupIdToTopicPartitionMap.entrySet()) {
-                String groupName = entry.getKey();
-                List<TopicPartition> tpList = entry.getValue();
-                final List<OffsetFetchRequestTopics> topics;
-                if (tpList != null) {
-                    Map<String, OffsetFetchRequestTopics> 
offsetFetchRequestTopicMap =
-                        new HashMap<>();
-                    for (TopicPartition topicPartition : tpList) {
-                        String topicName = topicPartition.topic();
-                        OffsetFetchRequestTopics topic = 
offsetFetchRequestTopicMap.getOrDefault(
-                            topicName, new 
OffsetFetchRequestTopics().setName(topicName));
-                        
topic.partitionIndexes().add(topicPartition.partition());
-                        offsetFetchRequestTopicMap.put(topicName, topic);
-                    }
-                    topics = new 
ArrayList<>(offsetFetchRequestTopicMap.values());
-                } else {
-                    topics = ALL_TOPIC_PARTITIONS_BATCH;
-                }
-                groups.add(new OffsetFetchRequestGroup()
-                    .setGroupId(groupName)
-                    .setTopics(topics));
-            }
-            this.data = new OffsetFetchRequestData()
-                .setGroups(groups)
-                .setRequireStable(requireStable);
+            this.data = data;
             this.throwOnFetchStableOffsetsUnsupported = 
throwOnFetchStableOffsetsUnsupported;
         }
 
         @Override
         public OffsetFetchRequest build(short version) {
-            if (data.groups().size() > 1 && version < 8) {
+            if (data.groups().size() > 1 && version < BATCH_MIN_VERSION) {
                 throw new NoBatchedOffsetFetchRequestException("Broker does 
not support"
                     + " batching groups for fetch offset request on version " 
+ version);
             }
-            if (data.requireStable() && version < 7) {
+            if (data.requireStable() && version < 
REQUIRE_STABLE_OFFSET_MIN_VERSION) {
                 if (throwOnFetchStableOffsetsUnsupported) {
                     throw new UnsupportedVersionException("Broker unexpectedly 
" +
                         "doesn't support requireStable flag on version " + 
version);
@@ -152,7 +76,7 @@ public class OffsetFetchRequest extends AbstractRequest {
                 }
             }
             // convert data to use the appropriate version since version 8 
uses different format
-            if (version < 8) {
+            if (version < BATCH_MIN_VERSION) {
                 OffsetFetchRequestData normalizedData;
                 if (!data.groups().isEmpty()) {
                     OffsetFetchRequestGroup group = data.groups().get(0);
@@ -175,7 +99,7 @@ public class OffsetFetchRequest extends AbstractRequest {
                 } else {
                     normalizedData = data;
                 }
-                if (normalizedData.topics() == null && version < 2) {
+                if (normalizedData.topics() == null && version < 
TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) {
                     throw new UnsupportedVersionException("The broker only 
supports OffsetFetchRequest " +
                         "v" + version + ", but we need v2 or newer to request 
all topic partitions.");
                 }
@@ -202,19 +126,6 @@ public class OffsetFetchRequest extends AbstractRequest {
         }
     }
 
-    public List<TopicPartition> partitions() {
-        if (isAllPartitions()) {
-            return null;
-        }
-        List<TopicPartition> partitions = new ArrayList<>();
-        for (OffsetFetchRequestTopic topic : data.topics()) {
-            for (Integer partitionIndex : topic.partitionIndexes()) {
-                partitions.add(new TopicPartition(topic.name(), 
partitionIndex));
-            }
-        }
-        return partitions;
-    }
-
     public String groupId() {
         return data.groupId();
     }
@@ -224,7 +135,7 @@ public class OffsetFetchRequest extends AbstractRequest {
     }
 
     public List<OffsetFetchRequestData.OffsetFetchRequestGroup> groups() {
-        if (version() >= 8) {
+        if (version() >= BATCH_MIN_VERSION) {
             return data.groups();
         } else {
             OffsetFetchRequestData.OffsetFetchRequestGroup group =
@@ -253,7 +164,7 @@ public class OffsetFetchRequest extends AbstractRequest {
         Map<String, List<TopicPartition>> groupIdsToPartitions = new 
HashMap<>();
         for (OffsetFetchRequestGroup group : data.groups()) {
             List<TopicPartition> tpList = null;
-            if (group.topics() != ALL_TOPIC_PARTITIONS_BATCH) {
+            if (group.topics() != null) {
                 tpList = new ArrayList<>();
                 for (OffsetFetchRequestTopics topic : group.topics()) {
                     for (Integer partitionIndex : topic.partitionIndexes()) {
@@ -285,67 +196,59 @@ public class OffsetFetchRequest extends AbstractRequest {
         this.data = data;
     }
 
-    public OffsetFetchResponse getErrorResponse(Errors error) {
-        return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, error);
-    }
-
-    public OffsetFetchResponse getErrorResponse(int throttleTimeMs, Errors 
error) {
-        Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responsePartitions = new HashMap<>();
-        if (version() < 2) {
-            OffsetFetchResponse.PartitionData partitionError = new 
OffsetFetchResponse.PartitionData(
-                    OffsetFetchResponse.INVALID_OFFSET,
-                    Optional.empty(),
-                    OffsetFetchResponse.NO_METADATA,
-                    error);
-
-            for (OffsetFetchRequestTopic topic : this.data.topics()) {
-                for (int partitionIndex : topic.partitionIndexes()) {
-                    responsePartitions.put(
-                        new TopicPartition(topic.name(), partitionIndex), 
partitionError);
-                }
-            }
-            return new OffsetFetchResponse(error, responsePartitions);
-        }
-        if (version() == 2) {
-            return new OffsetFetchResponse(error, responsePartitions);
-        }
-        if (version() >= 3 && version() < 8) {
-            return new OffsetFetchResponse(throttleTimeMs, error, 
responsePartitions);
-        }
-        List<String> groupIds = groupIds();
-        Map<String, Errors> errorsMap = new HashMap<>(groupIds.size());
-        Map<String, Map<TopicPartition, OffsetFetchResponse.PartitionData>> 
partitionMap =
-            new HashMap<>(groupIds.size());
-        for (String g : groupIds) {
-            errorsMap.put(g, error);
-            partitionMap.put(g, responsePartitions);
-        }
-        return new OffsetFetchResponse(throttleTimeMs, errorsMap, 
partitionMap);
-    }
-
     @Override
     public OffsetFetchResponse getErrorResponse(int throttleTimeMs, Throwable 
e) {
-        return getErrorResponse(throttleTimeMs, Errors.forException(e));
+        Errors error = Errors.forException(e);
+
+        if (version() < TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) {
+            // The response does not support top level error so we return each
+            // partition with the error.
+            return new OffsetFetchResponse(
+                new OffsetFetchResponseData()
+                    .setThrottleTimeMs(throttleTimeMs)
+                    .setTopics(data.topics().stream().map(topic ->
+                        new OffsetFetchResponseData.OffsetFetchResponseTopic()
+                            .setName(topic.name())
+                            
.setPartitions(topic.partitionIndexes().stream().map(partition ->
+                                new 
OffsetFetchResponseData.OffsetFetchResponsePartition()
+                                    .setPartitionIndex(partition)
+                                    .setErrorCode(error.code())
+                                    
.setCommittedOffset(OffsetFetchResponse.INVALID_OFFSET)
+                                    
.setMetadata(OffsetFetchResponse.NO_METADATA)
+                                    
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+                            ).collect(Collectors.toList()))
+                    ).collect(Collectors.toList())),
+                version()
+            );
+        } else if (version() < BATCH_MIN_VERSION) {
+            // The response does not support multiple groups but it does 
support
+            // top level error.
+            return new OffsetFetchResponse(
+                new OffsetFetchResponseData()
+                    .setThrottleTimeMs(throttleTimeMs)
+                    .setErrorCode(error.code()),
+                version()
+            );
+        } else {
+            // The response does support multiple groups so we provide a top 
level
+            // error per group.
+            return new OffsetFetchResponse(
+                new OffsetFetchResponseData()
+                    .setThrottleTimeMs(throttleTimeMs)
+                    .setGroups(data.groups().stream().map(group ->
+                        new OffsetFetchResponseData.OffsetFetchResponseGroup()
+                            .setGroupId(group.groupId())
+                            .setErrorCode(error.code())
+                    ).collect(Collectors.toList())),
+                version()
+            );
+        }
     }
 
     public static OffsetFetchRequest parse(Readable readable, short version) {
         return new OffsetFetchRequest(new OffsetFetchRequestData(readable, 
version), version);
     }
 
-    public boolean isAllPartitions() {
-        return data.topics() == ALL_TOPIC_PARTITIONS;
-    }
-
-    public boolean isAllPartitionsForGroup(String groupId) {
-        OffsetFetchRequestGroup group = data
-            .groups()
-            .stream()
-            .filter(g -> g.groupId().equals(groupId))
-            .collect(Collectors.toList())
-            .get(0);
-        return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
-    }
-
     @Override
     public OffsetFetchRequestData data() {
         return data;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0fa00dbea20..319a91d7407 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -4396,7 +4396,7 @@ public class KafkaAdminClientTest {
             ClientRequest clientRequest = mockClient.requests().peek();
             assertNotNull(clientRequest);
             assertEquals(300, clientRequest.requestTimeoutMs());
-            OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) 
clientRequest.requestBuilder()).data;
+            OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) 
clientRequest.requestBuilder()).build().data();
             assertTrue(data.requireStable());
             assertEquals(Collections.singletonList(GROUP_ID),
                     
data.groups().stream().map(OffsetFetchRequestGroup::groupId).collect(Collectors.toList()));
@@ -4791,7 +4791,7 @@ public class KafkaAdminClientTest {
         waitForRequest(mockClient, ApiKeys.OFFSET_FETCH);
 
         ClientRequest clientRequest = mockClient.requests().peek();
-        OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) 
clientRequest.requestBuilder()).data;
+        OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) 
clientRequest.requestBuilder()).build().data();
         Map<String, Map<TopicPartition, PartitionData>> results = new 
HashMap<>();
         Map<String, Errors> errors = new HashMap<>();
         data.groups().forEach(group -> {
@@ -4813,7 +4813,7 @@ public class KafkaAdminClientTest {
         waitForRequest(mockClient, ApiKeys.OFFSET_FETCH);
 
         ClientRequest clientRequest = mockClient.requests().peek();
-        OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) 
clientRequest.requestBuilder()).data;
+        OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) 
clientRequest.requestBuilder()).build().data();
         Map<String, Map<TopicPartition, PartitionData>> results = new 
HashMap<>();
         Map<String, Errors> errors = new HashMap<>();
         data.groups().forEach(group -> {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
index 1098925e42a..c4671ac1588 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
@@ -16,220 +16,239 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
-import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.OffsetFetchRequest.Builder;
-import 
org.apache.kafka.common.requests.OffsetFetchRequest.NoBatchedOffsetFetchRequestException;
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 
-import static 
org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class OffsetFetchRequestTest {
-
-    private final String topicOne = "topic1";
-    private final int partitionOne = 1;
-    private final String topicTwo = "topic2";
-    private final int partitionTwo = 2;
-    private final String topicThree = "topic3";
-    private final String group1 = "group1";
-    private final String group2 = "group2";
-    private final String group3 = "group3";
-    private final String group4 = "group4";
-    private final String group5 = "group5";
-    private final List<String> groups = Arrays.asList(group1, group2, group3, 
group4, group5);
-
-    private final List<Integer> listOfVersionsNonBatchOffsetFetch = 
Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7);
-
-
-    private OffsetFetchRequest.Builder builder;
-
-    @Test
-    public void testConstructor() {
-        List<TopicPartition> partitions = Arrays.asList(
-            new TopicPartition(topicOne, partitionOne),
-            new TopicPartition(topicTwo, partitionTwo));
-        int throttleTimeMs = 10;
-
-        Map<TopicPartition, PartitionData> expectedData = new HashMap<>();
-        for (TopicPartition partition : partitions) {
-            expectedData.put(partition, new PartitionData(
-                OffsetFetchResponse.INVALID_OFFSET,
-                Optional.empty(),
-                OffsetFetchResponse.NO_METADATA,
-                Errors.NONE
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+    public void testWithMultipleGroups(short version) {
+        var data = new OffsetFetchRequestData()
+            .setGroups(List.of(
+                new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                    .setGroupId("grp1")
+                    .setTopics(List.of(
+                        new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                            .setName("foo")
+                            .setPartitionIndexes(List.of(0, 1, 2))
+                    )),
+                new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                    .setGroupId("grp2")
+                    .setTopics(List.of(
+                        new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                            .setName("bar")
+                            .setPartitionIndexes(List.of(0, 1, 2))
+                    ))
             ));
+        var builder = new OffsetFetchRequest.Builder(data, false);
+
+        if (version < 8) {
+            
assertThrows(OffsetFetchRequest.NoBatchedOffsetFetchRequestException.class, () 
-> builder.build(version));
+        } else {
+            assertEquals(data, builder.build(version).data());
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+    public void testThrowOnFetchStableOffsetsUnsupported(short version) {
+        var builder = new OffsetFetchRequest.Builder(
+            new OffsetFetchRequestData()
+                .setRequireStable(true)
+                .setGroups(List.of(
+                    new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                        .setGroupId("grp1")
+                        .setTopics(List.of(
+                            new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                                .setName("foo")
+                                .setPartitionIndexes(List.of(0, 1, 2))
+                        ))
+                )),
+            true
+        );
+
+        if (version < 7) {
+            assertThrows(UnsupportedVersionException.class, () -> 
builder.build(version));
+        } else {
+            builder.build(version);
         }
+    }
 
-        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-            if (version < 8) {
-                builder = new OffsetFetchRequest.Builder(
-                    group1,
-                    false,
-                    partitions,
-                    false);
-                OffsetFetchRequest request = builder.build(version);
-                assertFalse(request.isAllPartitions());
-                assertEquals(group1, request.groupId());
-                assertEquals(partitions, request.partitions());
-
-                OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
-                assertEquals(Errors.NONE, response.error());
-                assertFalse(response.hasError());
-                assertEquals(Collections.singletonMap(Errors.NONE, version <= 
(short) 1 ? 3 : 1), response.errorCounts(),
-                    "Incorrect error count for version " + version);
-
-                if (version <= 1) {
-                    assertEquals(expectedData, response.responseDataV0ToV7());
-                }
-
-                if (version >= 3) {
-                    assertEquals(throttleTimeMs, response.throttleTimeMs());
-                } else {
-                    assertEquals(DEFAULT_THROTTLE_TIME, 
response.throttleTimeMs());
-                }
-            } else {
-                builder = new Builder(Collections.singletonMap(group1, 
partitions), false, false);
-                OffsetFetchRequest request = builder.build(version);
-                Map<String, List<TopicPartition>> groupToPartitionMap =
-                    request.groupIdsToPartitions();
-                Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
-                    request.groupIdsToTopics();
-                assertFalse(request.isAllPartitionsForGroup(group1));
-                assertTrue(groupToPartitionMap.containsKey(group1) && 
groupToTopicMap.containsKey(
-                    group1));
-                assertEquals(partitions, groupToPartitionMap.get(group1));
-                OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
-                assertEquals(Errors.NONE, response.groupLevelError(group1));
-                assertFalse(response.groupHasError(group1));
-                assertEquals(Collections.singletonMap(Errors.NONE, 1), 
response.errorCounts(),
-                    "Incorrect error count for version " + version);
-                assertEquals(throttleTimeMs, response.throttleTimeMs());
-            }
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+    public void testSingleGroup(short version) {
+        var data = new OffsetFetchRequestData()
+            .setGroups(List.of(
+                new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                    .setGroupId("grp1")
+                    .setTopics(List.of(
+                        new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                            .setName("foo")
+                            .setPartitionIndexes(List.of(0, 1, 2))
+                    ))
+            ));
+        var builder = new OffsetFetchRequest.Builder(data, false);
+
+        if (version < 8) {
+            var expectedRequest = new OffsetFetchRequestData()
+                .setGroupId("grp1")
+                .setTopics(List.of(
+                    new OffsetFetchRequestData.OffsetFetchRequestTopic()
+                        .setName("foo")
+                        .setPartitionIndexes(List.of(0, 1, 2))
+                ));
+            assertEquals(expectedRequest, builder.build(version).data());
+        } else {
+            assertEquals(data, builder.build(version).data());
         }
     }
 
-    @Test
-    public void testConstructorWithMultipleGroups() {
-        List<TopicPartition> topic1Partitions = Arrays.asList(
-            new TopicPartition(topicOne, partitionOne),
-            new TopicPartition(topicOne, partitionTwo));
-        List<TopicPartition> topic2Partitions = Arrays.asList(
-            new TopicPartition(topicTwo, partitionOne),
-            new TopicPartition(topicTwo, partitionTwo));
-        List<TopicPartition> topic3Partitions = Arrays.asList(
-            new TopicPartition(topicThree, partitionOne),
-            new TopicPartition(topicThree, partitionTwo));
-        Map<String, List<TopicPartition>> groupToTp = new HashMap<>();
-        groupToTp.put(group1, topic1Partitions);
-        groupToTp.put(group2, topic2Partitions);
-        groupToTp.put(group3, topic3Partitions);
-        groupToTp.put(group4, null);
-        groupToTp.put(group5, null);
-        int throttleTimeMs = 10;
-
-        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-            if (version >= 8) {
-                builder = new Builder(groupToTp, false, false);
-                OffsetFetchRequest request = builder.build(version);
-                Map<String, List<TopicPartition>> groupToPartitionMap =
-                    request.groupIdsToPartitions();
-                Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
-                    request.groupIdsToTopics();
-                assertEquals(groupToTp.keySet(), groupToTopicMap.keySet());
-                assertEquals(groupToTp.keySet(), groupToPartitionMap.keySet());
-                assertFalse(request.isAllPartitionsForGroup(group1));
-                assertFalse(request.isAllPartitionsForGroup(group2));
-                assertFalse(request.isAllPartitionsForGroup(group3));
-                assertTrue(request.isAllPartitionsForGroup(group4));
-                assertTrue(request.isAllPartitionsForGroup(group5));
-                OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
-                for (String group : groups) {
-                    assertEquals(Errors.NONE, response.groupLevelError(group));
-                    assertFalse(response.groupHasError(group));
-                }
-                assertEquals(Collections.singletonMap(Errors.NONE, 5), 
response.errorCounts(),
-                    "Incorrect error count for version " + version);
-                assertEquals(throttleTimeMs, response.throttleTimeMs());
-            }
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+    public void testSingleGroupWithAllTopics(short version) {
+        var data = new OffsetFetchRequestData()
+            .setGroups(List.of(
+                new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                    .setGroupId("grp1")
+                    .setTopics(null)
+            ));
+        var builder = new OffsetFetchRequest.Builder(data, false);
+
+        if (version < 2) {
+            assertThrows(UnsupportedVersionException.class, () -> 
builder.build(version));
+        } else if (version < 8) {
+            var expectedRequest = new OffsetFetchRequestData()
+                .setGroupId("grp1")
+                .setTopics(null);
+            assertEquals(expectedRequest, builder.build(version).data());
+        } else {
+            assertEquals(data, builder.build(version).data());
         }
     }
 
-    @Test
-    public void testBuildThrowForUnsupportedBatchRequest() {
-        for (int version : listOfVersionsNonBatchOffsetFetch) {
-            Map<String, List<TopicPartition>> groupPartitionMap = new 
HashMap<>();
-            groupPartitionMap.put(group1, null);
-            groupPartitionMap.put(group2, null);
-            builder = new Builder(groupPartitionMap, true, false);
-            final short finalVersion = (short) version;
-            assertThrows(NoBatchedOffsetFetchRequestException.class, () -> 
builder.build(finalVersion));
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+    public void testGetErrorResponse(short version) {
+        var request = new OffsetFetchRequest.Builder(
+            new OffsetFetchRequestData()
+                .setGroups(List.of(
+                    new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                        .setGroupId("grp1")
+                        .setTopics(List.of(
+                            new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                                .setName("foo")
+                                .setPartitionIndexes(List.of(0, 1))
+                        ))
+                )),
+            false
+        ).build(version);
+
+        if (version < 2) {
+            var expectedResponse = new OffsetFetchResponseData()
+                .setThrottleTimeMs(1000)
+                .setTopics(List.of(
+                    new OffsetFetchResponseData.OffsetFetchResponseTopic()
+                        .setName("foo")
+                        .setPartitions(List.of(
+                            new 
OffsetFetchResponseData.OffsetFetchResponsePartition()
+                                .setPartitionIndex(0)
+                                .setErrorCode(Errors.INVALID_GROUP_ID.code())
+                                
.setCommittedOffset(OffsetFetchResponse.INVALID_OFFSET)
+                                .setMetadata(OffsetFetchResponse.NO_METADATA)
+                                
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH),
+                            new 
OffsetFetchResponseData.OffsetFetchResponsePartition()
+                                .setPartitionIndex(1)
+                                .setErrorCode(Errors.INVALID_GROUP_ID.code())
+                                
.setCommittedOffset(OffsetFetchResponse.INVALID_OFFSET)
+                                .setMetadata(OffsetFetchResponse.NO_METADATA)
+                                
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+                        ))
+                ));
+            assertEquals(expectedResponse, request.getErrorResponse(1000, 
Errors.INVALID_GROUP_ID.exception()).data());
+        } else if (version < 8) {
+            var expectedResponse = new OffsetFetchResponseData()
+                .setThrottleTimeMs(1000)
+                .setErrorCode(Errors.INVALID_GROUP_ID.code());
+            assertEquals(expectedResponse, request.getErrorResponse(1000, 
Errors.INVALID_GROUP_ID.exception()).data());
+        } else {
+            var expectedResponse = new OffsetFetchResponseData()
+                .setThrottleTimeMs(1000)
+                .setGroups(List.of(
+                    new OffsetFetchResponseData.OffsetFetchResponseGroup()
+                        .setGroupId("grp1")
+                        .setErrorCode(Errors.INVALID_GROUP_ID.code())
+                ));
+            assertEquals(expectedResponse, request.getErrorResponse(1000, 
Errors.INVALID_GROUP_ID.exception()).data());
         }
     }
 
-    @Test
-    public void testConstructorFailForUnsupportedRequireStable() {
-        for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-            if (version < 8) {
-                // The builder needs to be initialized every cycle as the 
internal data `requireStable` flag is flipped.
-                builder = new OffsetFetchRequest.Builder(group1, true, null, 
false);
-                final short finalVersion = version;
-                if (version < 2) {
-                    assertThrows(UnsupportedVersionException.class, () -> 
builder.build(finalVersion));
-                } else {
-                    OffsetFetchRequest request = builder.build(finalVersion);
-                    assertEquals(group1, request.groupId());
-                    assertNull(request.partitions());
-                    assertTrue(request.isAllPartitions());
-                    if (version < 7) {
-                        assertFalse(request.requireStable());
-                    } else {
-                        assertTrue(request.requireStable());
-                    }
-                }
-            } else {
-                builder = new Builder(Collections.singletonMap(group1, null), 
true, false);
-                OffsetFetchRequest request = builder.build(version);
-                Map<String, List<TopicPartition>> groupToPartitionMap =
-                    request.groupIdsToPartitions();
-                Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
-                    request.groupIdsToTopics();
-                assertTrue(groupToPartitionMap.containsKey(group1) && 
groupToTopicMap.containsKey(
-                    group1));
-                assertNull(groupToPartitionMap.get(group1));
-                assertTrue(request.isAllPartitionsForGroup(group1));
-                assertTrue(request.requireStable());
-            }
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+    public void testGroups(short version) {
+        var request = new OffsetFetchRequest.Builder(
+            new OffsetFetchRequestData()
+                .setGroups(List.of(
+                    new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                        .setGroupId("grp1")
+                        .setTopics(List.of(
+                            new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                                .setName("foo")
+                                .setPartitionIndexes(List.of(0, 1, 2))
+                        ))
+                )),
+            false
+        ).build(version);
+
+        if (version < 8) {
+            var expectedGroups = List.of(
+                new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                    .setGroupId("grp1")
+                    .setTopics(List.of(
+                        new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                            .setName("foo")
+                            .setPartitionIndexes(List.of(0, 1, 2))
+                    ))
+            );
+            assertEquals(expectedGroups, request.groups());
+        } else {
+            assertEquals(request.data().groups(), request.groups());
         }
     }
 
-    @Test
-    public void testBuildThrowForUnsupportedRequireStable() {
-        for (int version : listOfVersionsNonBatchOffsetFetch) {
-            builder = new OffsetFetchRequest.Builder(group1, true, null, true);
-            if (version < 7) {
-                final short finalVersion = (short) version;
-                assertThrows(UnsupportedVersionException.class, () -> 
builder.build(finalVersion));
-            } else {
-                OffsetFetchRequest request = builder.build((short) version);
-                assertTrue(request.requireStable());
-            }
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH, fromVersion = 2)
+    public void testGroupsWithAllTopics(short version) {
+        var request = new OffsetFetchRequest.Builder(
+            new OffsetFetchRequestData()
+                .setGroups(List.of(
+                    new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                        .setGroupId("grp1")
+                        .setTopics(null)
+                )),
+            false
+        ).build(version);
+
+        if (version < 8) {
+            var expectedGroups = List.of(
+                new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                    .setGroupId("grp1")
+                    .setTopics(null)
+            );
+            assertEquals(expectedGroups, request.groups());
+        } else {
+            assertEquals(request.data().groups(), request.groups());
         }
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index ce379f512f0..4a1b2d7f9b9 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -199,6 +199,7 @@ import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection;
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic;
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection;
@@ -276,8 +277,6 @@ import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
@@ -772,55 +771,6 @@ public class RequestResponseTest {
         assertThrows(IllegalArgumentException.class, () -> 
createTopicsRequest.serializeWithHeader(requestHeader));
     }
 
-    @Test
-    public void testOffsetFetchRequestBuilderToStringV0ToV7() {
-        List<Boolean> stableFlags = asList(true, false);
-        for (Boolean requireStable : stableFlags) {
-            String allTopicPartitionsString = new OffsetFetchRequest.Builder(
-                "someGroup",
-                requireStable,
-                null,
-                false
-            ).toString();
-
-            assertTrue(allTopicPartitionsString.contains("groupId='', 
topics=[],"
-                + " groups=[OffsetFetchRequestGroup(groupId='someGroup', 
memberId=null, memberEpoch=-1, topics=null)], requireStable=" + requireStable));
-            String string = new OffsetFetchRequest.Builder(
-                "group1",
-                requireStable,
-                singletonList(
-                    new TopicPartition("test11", 1)),
-                false
-            ).toString();
-            assertTrue(string.contains("test11"));
-            assertTrue(string.contains("group1"));
-            assertTrue(string.contains("requireStable=" + requireStable));
-        }
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void testOffsetFetchRequestBuilderToStringV8AndAbove(boolean 
requireStable) {
-        String allTopicPartitionsString = new OffsetFetchRequest.Builder(
-            Collections.singletonMap("someGroup", null),
-            requireStable,
-            false
-        ).toString();
-        
assertTrue(allTopicPartitionsString.contains("groups=[OffsetFetchRequestGroup"
-            + "(groupId='someGroup', memberId=null, memberEpoch=-1, 
topics=null)], requireStable=" + requireStable));
-
-        String subsetTopicPartitionsString = new OffsetFetchRequest.Builder(
-            Collections.singletonMap(
-                "group1",
-                singletonList(new TopicPartition("test11", 1))),
-            requireStable,
-            false
-        ).toString();
-        assertTrue(subsetTopicPartitionsString.contains("test11"));
-        assertTrue(subsetTopicPartitionsString.contains("group1"));
-        assertTrue(subsetTopicPartitionsString.contains("requireStable=" + 
requireStable));
-    }
-
     @Test
     public void testApiVersionsRequestBeforeV3Validation() {
         for (short version = 0; version < 3; version++) {
@@ -2446,66 +2396,83 @@ public class RequestResponseTest {
     }
 
     private OffsetFetchRequest createOffsetFetchRequest(short version, boolean 
requireStable) {
-        if (version < 8) {
-            return new OffsetFetchRequest.Builder(
-                "group1",
-                requireStable,
-                singletonList(new TopicPartition("test11", 1)),
-                false)
-                .build(version);
-        }
         return new OffsetFetchRequest.Builder(
-                Collections.singletonMap(
-                "group1",
-                singletonList(new TopicPartition("test11", 1))),
-            requireStable,
-            false)
-            .build(version);
+            new OffsetFetchRequestData()
+                .setRequireStable(requireStable)
+                .setGroups(List.of(
+                    new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                        .setGroupId("group1")
+                        .setMemberId(version >= 9 ? "memberid" : null)
+                        .setMemberEpoch(version >= 9 ? 10 : -1)
+                        .setTopics(List.of(
+                            new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                                .setName("test11")
+                                .setPartitionIndexes(List.of(1))
+                        ))
+                )),
+            false
+        ).build(version);
     }
 
     private OffsetFetchRequest 
createOffsetFetchRequestWithMultipleGroups(short version, boolean 
requireStable) {
-        Map<String, List<TopicPartition>> groupToPartitionMap = new 
HashMap<>();
-        List<TopicPartition> topic1 = singletonList(
-            new TopicPartition("topic1", 0));
-        List<TopicPartition> topic2 = asList(
-            new TopicPartition("topic1", 0),
-            new TopicPartition("topic2", 0),
-            new TopicPartition("topic2", 1));
-        List<TopicPartition> topic3 = asList(
-            new TopicPartition("topic1", 0),
-            new TopicPartition("topic2", 0),
-            new TopicPartition("topic2", 1),
-            new TopicPartition("topic3", 0),
-            new TopicPartition("topic3", 1),
-            new TopicPartition("topic3", 2));
-        groupToPartitionMap.put("group1", topic1);
-        groupToPartitionMap.put("group2", topic2);
-        groupToPartitionMap.put("group3", topic3);
-        groupToPartitionMap.put("group4", null);
-        groupToPartitionMap.put("group5", null);
-
         return new OffsetFetchRequest.Builder(
-            groupToPartitionMap,
-            requireStable,
+            new OffsetFetchRequestData()
+                .setRequireStable(requireStable)
+                .setGroups(List.of(
+                    new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                        .setGroupId("group1")
+                        .setTopics(List.of(
+                            new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                                .setName("topic1")
+                                .setPartitionIndexes(List.of(0))
+                        )),
+                    new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                        .setGroupId("group2")
+                        .setTopics(List.of(
+                            new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                                .setName("topic1")
+                                .setPartitionIndexes(List.of(0)),
+                            new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                                .setName("topic2")
+                                .setPartitionIndexes(List.of(0, 1))
+                        )),
+                    new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                        .setGroupId("group3")
+                        .setTopics(List.of(
+                            new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                                .setName("topic1")
+                                .setPartitionIndexes(List.of(0)),
+                            new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                                .setName("topic2")
+                                .setPartitionIndexes(List.of(0, 1)),
+                            new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+                                .setName("topic3")
+                                .setPartitionIndexes(List.of(0, 1, 2))
+                        )),
+                    new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                        .setGroupId("group4")
+                        .setTopics(null),
+                    new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                        .setGroupId("group5")
+                        .setTopics(null)
+                )),
             false
         ).build(version);
     }
 
     private OffsetFetchRequest createOffsetFetchRequestForAllPartition(short 
version, boolean requireStable) {
-        if (version < 8) {
-            return new OffsetFetchRequest.Builder(
-                "group1",
-                requireStable,
-                null,
-                false)
-                .build(version);
-        }
         return new OffsetFetchRequest.Builder(
-            Collections.singletonMap(
-                "group1", null),
-            requireStable,
-            false)
-            .build(version);
+            new OffsetFetchRequestData()
+                .setRequireStable(requireStable)
+                .setGroups(List.of(
+                    new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                        .setGroupId("group1")
+                        .setMemberId(version >= 9 ? "memberid" : null)
+                        .setMemberEpoch(version >= 9 ? 10 : -1)
+                        .setTopics(null)
+                )),
+            false
+        ).build(version);
     }
 
     private OffsetFetchResponse createOffsetFetchResponse(short version) {
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e9e476fcd12..5286e719784 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -37,7 +37,7 @@ import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, 
ListOffsetsTopic}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition,
 OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
-import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, 
ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, 
ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, 
CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, 
DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, 
DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsR [...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, 
ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, 
ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, 
CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, 
DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, 
DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsR [...]
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
SimpleRecord}
@@ -318,15 +318,53 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   private def createOffsetFetchRequest: OffsetFetchRequest = {
-    new requests.OffsetFetchRequest.Builder(group, false, List(tp).asJava, 
false).build()
+    new OffsetFetchRequest.Builder(
+      new OffsetFetchRequestData()
+        .setRequireStable(false)
+        .setGroups(List(
+          new OffsetFetchRequestData.OffsetFetchRequestGroup()
+            .setGroupId(group)
+            .setTopics(List(
+              new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName(tp.topic)
+                .setPartitionIndexes(List[Integer](tp.partition).asJava)
+            ).asJava)
+        ).asJava),
+      false
+    ).build()
   }
 
   private def createOffsetFetchRequestAllPartitions: OffsetFetchRequest = {
-    new requests.OffsetFetchRequest.Builder(group, false, null, false).build()
+    new OffsetFetchRequest.Builder(
+      new OffsetFetchRequestData()
+        .setRequireStable(false)
+        .setGroups(List(
+          new OffsetFetchRequestData.OffsetFetchRequestGroup()
+            .setGroupId(group)
+            .setTopics(null)
+        ).asJava),
+      false
+    ).build()
   }
 
   private def createOffsetFetchRequest(groupToPartitionMap: util.Map[String, 
util.List[TopicPartition]]): OffsetFetchRequest = {
-    new requests.OffsetFetchRequest.Builder(groupToPartitionMap, false, 
false).build()
+    new OffsetFetchRequest.Builder(
+      new OffsetFetchRequestData()
+        .setGroups(groupToPartitionMap.asScala.map { case (groupId, 
partitions) =>
+          new OffsetFetchRequestData.OffsetFetchRequestGroup()
+            .setGroupId(groupId)
+            .setTopics(
+              if (partitions == null)
+                null
+              else
+                partitions.asScala.groupBy(_.topic).map { case (topic, 
partitions) =>
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(topic)
+                    
.setPartitionIndexes(partitions.map(_.partition).map(Int.box).asJava)
+                }.toList.asJava)
+        }.toList.asJava),
+      false
+    ).build()
   }
 
   private def createFindCoordinatorRequest = {
diff --git 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index be96826858a..df939c29ffb 100644
--- 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -24,7 +24,7 @@ import 
org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupR
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
 import 
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
-import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, 
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, 
ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, 
DeleteGroupsResponseData, DescribeGroupsRequestData, 
DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, 
HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, 
JoinGroupResponseData, LeaveGroupRes [...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, 
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, 
ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, 
DeleteGroupsResponseData, DescribeGroupsRequestData, 
DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, 
HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, 
JoinGroupResponseData, LeaveGroupRes [...]
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, 
ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, 
ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, 
DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, 
HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, 
InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, L [...]
 import org.apache.kafka.common.serialization.StringSerializer
@@ -333,11 +333,23 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     version: Short
   ): OffsetFetchResponseData.OffsetFetchResponseGroup = {
     val request = new OffsetFetchRequest.Builder(
-      groupId,
-      memberId,
-      memberEpoch,
-      requireStable,
-      if (partitions == null) null else partitions.asJava,
+      new OffsetFetchRequestData()
+        .setRequireStable(requireStable)
+        .setGroups(List(
+          new OffsetFetchRequestData.OffsetFetchRequestGroup()
+            .setGroupId(groupId)
+            .setMemberId(memberId)
+            .setMemberEpoch(memberEpoch)
+            .setTopics(
+              if (partitions == null)
+                null
+              else
+                partitions.groupBy(_.topic).map { case (topic, partitions) =>
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(topic)
+                    
.setPartitionIndexes(partitions.map(_.partition).map(Int.box).asJava)
+                }.toList.asJava)
+        ).asJava),
       false
     ).build(version)
 
@@ -383,8 +395,22 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     }
 
     val request = new OffsetFetchRequest.Builder(
-      groups.map { case (k, v) => (k, v.asJava) }.asJava,
-      requireStable,
+      new OffsetFetchRequestData()
+        .setRequireStable(requireStable)
+        .setGroups(groups.map { case (groupId, partitions) =>
+          new OffsetFetchRequestData.OffsetFetchRequestGroup()
+            .setGroupId(groupId)
+            .setTopics(
+              if (partitions == null)
+                null
+              else
+                partitions.groupBy(_.topic).map { case (topic, partitions) =>
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(topic)
+                    
.setPartitionIndexes(partitions.map(_.partition).map(Int.box).asJava)
+                }.toList.asJava
+            )
+        }.toList.asJava),
       false
     ).build(version)
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 0b51fd886d8..e3ffce710a6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -8242,16 +8242,30 @@ class KafkaApisTest extends Logging {
   @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
   def testHandleOffsetFetchWithMultipleGroups(version: Short): Unit = {
     def makeRequest(version: Short): RequestChannel.Request = {
-      val groups = Map(
-        "group-1" -> List(
-          new TopicPartition("foo", 0),
-          new TopicPartition("foo", 1)
-        ).asJava,
-        "group-2" -> null,
-        "group-3" -> null,
-        "group-4" -> null,
-      ).asJava
-      buildRequest(new OffsetFetchRequest.Builder(groups, false, 
false).build(version))
+      buildRequest(
+        new OffsetFetchRequest.Builder(
+          new OffsetFetchRequestData()
+            .setGroups(List(
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-1")
+                .setTopics(List(
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName("foo")
+                    .setPartitionIndexes(List[Integer](0, 1).asJava)
+                ).asJava),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-2")
+                .setTopics(null),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-3")
+                .setTopics(null),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-4")
+                .setTopics(null),
+            ).asJava),
+          false
+        ).build(version)
+      )
     }
 
     if (version < 8) {
@@ -8364,12 +8378,17 @@ class KafkaApisTest extends Logging {
   def testHandleOffsetFetchWithSingleGroup(version: Short): Unit = {
     def makeRequest(version: Short): RequestChannel.Request = {
       buildRequest(new OffsetFetchRequest.Builder(
-        "group-1",
-        false,
-        List(
-          new TopicPartition("foo", 0),
-          new TopicPartition("foo", 1)
-        ).asJava,
+        new OffsetFetchRequestData()
+          .setRequireStable(false)
+          .setGroups(List(
+            new OffsetFetchRequestData.OffsetFetchRequestGroup()
+              .setGroupId("group-1")
+              .setTopics(List(
+                new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                  .setName("foo")
+                  .setPartitionIndexes(List[Integer](0, 1).asJava)
+              ).asJava)
+          ).asJava),
         false
       ).build(version))
     }
@@ -8434,17 +8453,18 @@ class KafkaApisTest extends Logging {
   }
 
   @ParameterizedTest
-  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+  // Version 1 does not support fetching offsets for all topics.
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH, fromVersion = 2)
   def testHandleOffsetFetchAllOffsetsWithSingleGroup(version: Short): Unit = {
-    // Version 0 gets offsets from Zookeeper. Version 1 does not support 
fetching all
-    // offsets request. We are not interested in testing these here.
-    if (version < 2) return
-
     def makeRequest(version: Short): RequestChannel.Request = {
       buildRequest(new OffsetFetchRequest.Builder(
-        "group-1",
-        false,
-        null, // all offsets.
+        new OffsetFetchRequestData()
+          .setRequireStable(false)
+          .setGroups(List(
+            new OffsetFetchRequestData.OffsetFetchRequestGroup()
+              .setGroupId("group-1")
+              .setTopics(null) // all offsets.
+          ).asJava),
         false
       ).build(version))
     }
@@ -8509,19 +8529,40 @@ class KafkaApisTest extends Logging {
   @Test
   def testHandleOffsetFetchAuthorization(): Unit = {
     def makeRequest(version: Short): RequestChannel.Request = {
-      val groups = Map(
-        "group-1" -> List(
-          new TopicPartition("foo", 0),
-          new TopicPartition("bar", 0)
-        ).asJava,
-        "group-2" -> List(
-          new TopicPartition("foo", 0),
-          new TopicPartition("bar", 0)
-        ).asJava,
-        "group-3" -> null,
-        "group-4" -> null,
-      ).asJava
-      buildRequest(new OffsetFetchRequest.Builder(groups, false, 
false).build(version))
+      buildRequest(
+        new OffsetFetchRequest.Builder(
+          new OffsetFetchRequestData()
+            .setGroups(List(
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-1")
+                .setTopics(List(
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName("foo")
+                    .setPartitionIndexes(List[Integer](0).asJava),
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName("bar")
+                    .setPartitionIndexes(List[Integer](0).asJava)
+                ).asJava),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-2")
+                .setTopics(List(
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName("foo")
+                    .setPartitionIndexes(List[Integer](0).asJava),
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName("bar")
+                    .setPartitionIndexes(List[Integer](0).asJava)
+                ).asJava),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-3")
+                .setTopics(null),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-4")
+                .setTopics(null),
+            ).asJava),
+          false
+        ).build(version)
+      )
     }
 
     val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
@@ -8662,17 +8703,34 @@ class KafkaApisTest extends Logging {
   @Test
   def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(): Unit = {
     def makeRequest(version: Short): RequestChannel.Request = {
-      val groups = Map(
-        "group-1" -> List(
-          new TopicPartition("foo", 0),
-          new TopicPartition("bar", 0)
-        ).asJava,
-        "group-2" -> List(
-          new TopicPartition("foo", 0),
-          new TopicPartition("bar", 0)
-        ).asJava
-      ).asJava
-      buildRequest(new OffsetFetchRequest.Builder(groups, false, 
false).build(version))
+      buildRequest(
+        new OffsetFetchRequest.Builder(
+          new OffsetFetchRequestData()
+            .setGroups(List(
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-1")
+                .setTopics(List(
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName("foo")
+                    .setPartitionIndexes(List[Integer](0).asJava),
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName("bar")
+                    .setPartitionIndexes(List[Integer](0).asJava)
+                ).asJava),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-2")
+                .setTopics(List(
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName("foo")
+                    .setPartitionIndexes(List[Integer](0).asJava),
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName("bar")
+                    .setPartitionIndexes(List[Integer](0).asJava)
+                ).asJava)
+            ).asJava),
+          false
+        ).build(version)
+      )
     }
 
     val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 7610e466207..2f001d9baf8 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -292,7 +292,19 @@ class RequestQuotaTest extends BaseRequestTest {
               )
           )
         case ApiKeys.OFFSET_FETCH =>
-          new OffsetFetchRequest.Builder(Map("test-group"-> 
List(tp).asJava).asJava, false, false)
+          new OffsetFetchRequest.Builder(
+            new OffsetFetchRequestData()
+              .setGroups(List(
+                new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                  .setGroupId("test-group")
+                  .setTopics(List(
+                    new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                      .setName(tp.topic)
+                      .setPartitionIndexes(List[Integer](tp.partition).asJava)
+                  ).asJava)
+              ).asJava),
+            false
+          )
 
         case ApiKeys.FIND_COORDINATOR =>
           new FindCoordinatorRequest.Builder(

Reply via email to