This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 24a86423e99 KAFKA-14367; Add `OffsetFetch` to the new 
`GroupCoordinator` interface (#12870)
24a86423e99 is described below

commit 24a86423e9907b751d98fddc7196332feea2b48d
Author: David Jacot <[email protected]>
AuthorDate: Tue Jan 10 20:38:31 2023 +0100

    KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface 
(#12870)
    
    This patch adds OffsetFetch to the new GroupCoordinator interface and 
updates KafkaApis to use it.
    
    Reviewers: Philip Nee <[email protected]>, Jeff Kim <[email protected]>, 
Justine Olshan <[email protected]>
---
 .../kafka/common/requests/OffsetFetchRequest.java  |  18 ++
 .../kafka/common/requests/OffsetFetchResponse.java |  60 +++-
 .../common/message/OffsetFetchResponse.json        |   2 +-
 .../group/GroupCoordinatorAdapter.scala            |  82 ++++-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 179 ++++++-----
 .../group/GroupCoordinatorAdapterTest.scala        | 176 ++++++++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 335 +++++++++++++++++++++
 .../unit/kafka/server/OffsetFetchRequestTest.scala |  39 +--
 .../kafka/coordinator/group/GroupCoordinator.java  |  32 ++
 9 files changed, 799 insertions(+), 124 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index c5c094a1460..edcad541dca 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -227,6 +227,24 @@ public class OffsetFetchRequest extends AbstractRequest {
         return data.requireStable();
     }
 
+    public List<OffsetFetchRequestData.OffsetFetchRequestGroup> groups() {
+        if (version() >= 8) {
+            return data.groups();
+        } else {
+            OffsetFetchRequestData.OffsetFetchRequestGroup group =
+                new 
OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(data.groupId());
+
+            data.topics().forEach(topic -> {
+                group.topics().add(new OffsetFetchRequestTopics()
+                    .setName(topic.name())
+                    .setPartitionIndexes(topic.partitionIndexes())
+                );
+            });
+
+            return Collections.singletonList(group);
+        }
+    }
+
     public Map<String, List<TopicPartition>> groupIdsToPartitions() {
         Map<String, List<TopicPartition>> groupIdsToPartitions = new 
HashMap<>();
         for (OffsetFetchRequestGroup group : data.groups()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 2d585a582ae..6941e280916 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
@@ -119,12 +120,6 @@ public class OffsetFetchResponse extends AbstractResponse {
         }
     }
 
-    public OffsetFetchResponse(OffsetFetchResponseData data) {
-        super(ApiKeys.OFFSET_FETCH);
-        this.data = data;
-        this.error = null;
-    }
-
     /**
      * Constructor without throttle time.
      * @param error Potential coordinator or group level error code (for api 
version 2 and later)
@@ -208,6 +203,59 @@ public class OffsetFetchResponse extends AbstractResponse {
         this.error = null;
     }
 
+    public OffsetFetchResponse(List<OffsetFetchResponseGroup> groups, short 
version) {
+        super(ApiKeys.OFFSET_FETCH);
+        data = new OffsetFetchResponseData();
+
+        if (version >= 8) {
+            data.setGroups(groups);
+            error = null;
+
+            for (OffsetFetchResponseGroup group : data.groups()) {
+                this.groupLevelErrors.put(group.groupId(), 
Errors.forCode(group.errorCode()));
+            }
+        } else {
+            if (groups.size() != 1) {
+                throw new UnsupportedVersionException(
+                    "Version " + version + " of OffsetFetchResponse only 
supports one group."
+                );
+            }
+
+            OffsetFetchResponseGroup group = groups.get(0);
+            data.setErrorCode(group.errorCode());
+            error = Errors.forCode(group.errorCode());
+
+            group.topics().forEach(topic -> {
+                OffsetFetchResponseTopic newTopic = new 
OffsetFetchResponseTopic().setName(topic.name());
+                data.topics().add(newTopic);
+
+                topic.partitions().forEach(partition -> {
+                    OffsetFetchResponsePartition newPartition;
+
+                    if (version < 2 && group.errorCode() != 
Errors.NONE.code()) {
+                        // Versions prior to version 2 do not support a top 
level error. Therefore,
+                        // we put it at the partition level.
+                        newPartition = new OffsetFetchResponsePartition()
+                            .setPartitionIndex(partition.partitionIndex())
+                            .setErrorCode(group.errorCode())
+                            .setCommittedOffset(INVALID_OFFSET)
+                            .setMetadata(NO_METADATA)
+                            
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH);
+                    } else {
+                        newPartition = new OffsetFetchResponsePartition()
+                            .setPartitionIndex(partition.partitionIndex())
+                            .setErrorCode(partition.errorCode())
+                            .setCommittedOffset(partition.committedOffset())
+                            .setMetadata(partition.metadata())
+                            
.setCommittedLeaderEpoch(partition.committedLeaderEpoch());
+                    }
+
+                    newTopic.partitions().add(newPartition);
+                });
+            });
+        }
+    }
+
     public OffsetFetchResponse(OffsetFetchResponseData data, short version) {
         super(ApiKeys.OFFSET_FETCH);
         this.data = data;
diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json 
b/clients/src/main/resources/common/message/OffsetFetchResponse.json
index dfad60e27d6..71acf0b4d2e 100644
--- a/clients/src/main/resources/common/message/OffsetFetchResponse.json
+++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json
@@ -57,7 +57,7 @@
     ]},
     { "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", 
"ignorable": true,
       "about": "The top-level error code, or 0 if there was no error." },
-    {"name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
+    { "name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
       "about": "The responses per group id.", "fields": [
       { "name": "groupId", "type": "string", "versions": "8+", "entityType": 
"groupId",
         "about": "The group ID." },
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 4e96e4373a7..29242a8774a 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -18,13 +18,15 @@ package kafka.coordinator.group
 
 import kafka.server.RequestLocal
 import kafka.utils.Implicits.MapExtensionMethods
-import org.apache.kafka.common.message.{DeleteGroupsResponseData, 
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, 
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, 
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, 
SyncGroupRequestData, SyncGroupResponseData}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{DeleteGroupsResponseData, 
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, 
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, 
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, 
OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, 
SyncGroupResponseData}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.RequestContext
 import org.apache.kafka.common.utils.BufferSupplier
 
 import java.util
 import java.util.concurrent.CompletableFuture
-import scala.collection.immutable
+import scala.collection.{immutable, mutable}
 import scala.jdk.CollectionConverters._
 
 /**
@@ -234,4 +236,80 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def fetchAllOffsets(
+    context: RequestContext,
+    groupId: String,
+    requireStable: Boolean
+  ): 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] 
= {
+    handleFetchOffset(
+      groupId,
+      requireStable,
+      None
+    )
+  }
+
+  override def fetchOffsets(
+    context: RequestContext,
+    groupId: String,
+    topics: util.List[OffsetFetchRequestData.OffsetFetchRequestTopics],
+    requireStable: Boolean
+  ): 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] 
= {
+    val topicPartitions = new mutable.ArrayBuffer[TopicPartition]()
+    topics.forEach { topic =>
+      topic.partitionIndexes.forEach { partition =>
+        topicPartitions += new TopicPartition(topic.name, partition)
+      }
+    }
+
+    handleFetchOffset(
+      groupId,
+      requireStable,
+      Some(topicPartitions.toSeq)
+    )
+  }
+
+  private def handleFetchOffset(
+    groupId: String,
+    requireStable: Boolean,
+    partitions: Option[Seq[TopicPartition]]
+  ): 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] 
= {
+    val (error, results) = coordinator.handleFetchOffsets(
+      groupId,
+      requireStable,
+      partitions
+    )
+
+    val future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+    if (error != Errors.NONE) {
+      future.completeExceptionally(error.exception)
+    } else {
+      val topicsList = new 
util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]()
+      val topicsMap = new mutable.HashMap[String, 
OffsetFetchResponseData.OffsetFetchResponseTopics]()
+
+      results.forKeyValue { (tp, offset) =>
+        val topic = topicsMap.get(tp.topic) match {
+          case Some(topic) =>
+            topic
+
+          case None =>
+            val topicOffsets = new 
OffsetFetchResponseData.OffsetFetchResponseTopics().setName(tp.topic)
+            topicsMap += tp.topic -> topicOffsets
+            topicsList.add(topicOffsets)
+            topicOffsets
+        }
+
+        topic.partitions.add(new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+          .setPartitionIndex(tp.partition)
+          .setMetadata(offset.metadata)
+          .setCommittedOffset(offset.offset)
+          .setCommittedLeaderEpoch(offset.leaderEpoch.orElse(-1))
+          .setErrorCode(offset.error.code))
+      }
+
+      future.complete(topicsList)
+    }
+
+    future
+  }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4ec7a7d9a6c..20cfe766eb2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -57,7 +57,6 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
 import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
@@ -189,7 +188,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, 
requestLocal)
         case ApiKeys.CONTROLLED_SHUTDOWN => 
handleControlledShutdownRequest(request)
         case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, 
requestLocal)
-        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
+        case ApiKeys.OFFSET_FETCH => 
handleOffsetFetchRequest(request).exceptionally(handleError)
         case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
         case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, 
requestLocal).exceptionally(handleError)
         case ApiKeys.HEARTBEAT => 
handleHeartbeatRequest(request).exceptionally(handleError)
@@ -1338,27 +1337,22 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset fetch request
    */
-  def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = {
+  def handleOffsetFetchRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val version = request.header.apiVersion
     if (version == 0) {
-      // reading offsets from ZK
-      handleOffsetFetchRequestV0(request)
-    } else if (version >= 1 && version <= 7) {
-      // reading offsets from Kafka
-      handleOffsetFetchRequestBetweenV1AndV7(request)
+      handleOffsetFetchRequestFromZookeeper(request)
     } else {
-      // batching offset reads for multiple groups starts with version 8 and 
greater
-      handleOffsetFetchRequestV8AndAbove(request)
+      handleOffsetFetchRequestFromCoordinator(request)
     }
   }
 
-  private def handleOffsetFetchRequestV0(request: RequestChannel.Request): 
Unit = {
+  private def handleOffsetFetchRequestFromZookeeper(request: 
RequestChannel.Request): CompletableFuture[Unit] = {
     val header = request.header
     val offsetFetchRequest = request.body[OffsetFetchRequest]
 
     def createResponse(requestThrottleMs: Int): AbstractResponse = {
       val offsetFetchResponse =
-      // reject the request if not authorized to the group
+        // reject the request if not authorized to the group
         if (!authHelper.authorize(request.context, DESCRIBE, GROUP, 
offsetFetchRequest.groupId))
           offsetFetchRequest.getErrorResponse(requestThrottleMs, 
Errors.GROUP_AUTHORIZATION_FAILED)
         else {
@@ -1395,79 +1389,114 @@ class KafkaApis(val requestChannel: RequestChannel,
       offsetFetchResponse
     }
     requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    CompletableFuture.completedFuture[Unit](())
   }
 
-  private def handleOffsetFetchRequestBetweenV1AndV7(request: 
RequestChannel.Request): Unit = {
-    val header = request.header
+  private def handleOffsetFetchRequestFromCoordinator(request: 
RequestChannel.Request): CompletableFuture[Unit] = {
     val offsetFetchRequest = request.body[OffsetFetchRequest]
-    val groupId = offsetFetchRequest.groupId()
-    val (error, partitionData) = fetchOffsets(groupId, 
offsetFetchRequest.isAllPartitions,
-      offsetFetchRequest.requireStable, offsetFetchRequest.partitions, 
request.context)
-    def createResponse(requestThrottleMs: Int): AbstractResponse = {
-      val offsetFetchResponse =
-        if (error != Errors.NONE) {
-          offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
-        } else {
-          new OffsetFetchResponse(requestThrottleMs, Errors.NONE, 
partitionData.asJava)
-        }
-      trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
-      offsetFetchResponse
+    val groups = offsetFetchRequest.groups()
+    val requireStable = offsetFetchRequest.requireStable()
+
+    val futures = new 
mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size)
+    groups.forEach { groupOffsetFetch =>
+      val isAllPartitions = groupOffsetFetch.topics == null
+      if (!authHelper.authorize(request.context, DESCRIBE, GROUP, 
groupOffsetFetch.groupId)) {
+        futures += CompletableFuture.completedFuture(new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+          .setGroupId(groupOffsetFetch.groupId)
+          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
+      } else if (isAllPartitions) {
+        futures += fetchAllOffsetsForGroup(
+          request.context,
+          groupOffsetFetch,
+          requireStable
+        )
+      } else {
+        futures += fetchOffsetsForGroup(
+          request.context,
+          groupOffsetFetch,
+          requireStable
+        )
+      }
     }
-    requestHelper.sendResponseMaybeThrottle(request, createResponse)
-  }
 
-  private def handleOffsetFetchRequestV8AndAbove(request: 
RequestChannel.Request): Unit = {
-    val header = request.header
-    val offsetFetchRequest = request.body[OffsetFetchRequest]
-    val groupIds = offsetFetchRequest.groupIds().asScala
-    val groupToErrorMap =  mutable.Map.empty[String, Errors]
-    val groupToPartitionData =  mutable.Map.empty[String, 
util.Map[TopicPartition, PartitionData]]
-    val groupToTopicPartitions = offsetFetchRequest.groupIdsToPartitions()
-    groupIds.foreach(g => {
-      val (error, partitionData) = fetchOffsets(g,
-        offsetFetchRequest.isAllPartitionsForGroup(g),
-        offsetFetchRequest.requireStable(),
-        groupToTopicPartitions.get(g), request.context)
-      groupToErrorMap += (g -> error)
-      groupToPartitionData += (g -> partitionData.asJava)
-    })
-
-    def createResponse(requestThrottleMs: Int): AbstractResponse = {
-      val offsetFetchResponse = new OffsetFetchResponse(requestThrottleMs,
-        groupToErrorMap.asJava, groupToPartitionData.asJava)
-      trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
-      offsetFetchResponse
+    CompletableFuture.allOf(futures.toArray: _*).handle[Unit] { (_, _) =>
+      val groupResponses = new 
ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseGroup](futures.size)
+      futures.foreach(future => groupResponses += future.get())
+      requestHelper.sendMaybeThrottle(request, new 
OffsetFetchResponse(groupResponses.asJava, request.context.apiVersion))
     }
-
-    requestHelper.sendResponseMaybeThrottle(request, createResponse)
   }
 
-  private def fetchOffsets(groupId: String, isAllPartitions: Boolean, 
requireStable: Boolean,
-                           partitions: util.List[TopicPartition], context: 
RequestContext): (Errors, Map[TopicPartition, 
OffsetFetchResponse.PartitionData]) = {
-    if (!authHelper.authorize(context, DESCRIBE, GROUP, groupId)) {
-      (Errors.GROUP_AUTHORIZATION_FAILED, Map.empty)
-    } else {
-      if (isAllPartitions) {
-        val (error, allPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable)
-        if (error != Errors.NONE) {
-          (error, allPartitionData)
-        } else {
-          // clients are not allowed to see offsets for topics that are not 
authorized for Describe
-          val (authorizedPartitionData, _) = 
authHelper.partitionMapByAuthorized(context,
-            DESCRIBE, TOPIC, allPartitionData)(_.topic)
-          (Errors.NONE, authorizedPartitionData)
-        }
+  private def fetchAllOffsetsForGroup(
+    requestContext: RequestContext,
+    groupOffsetFetch: OffsetFetchRequestData.OffsetFetchRequestGroup,
+    requireStable: Boolean
+  ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
+    newGroupCoordinator.fetchAllOffsets(
+      requestContext,
+      groupOffsetFetch.groupId,
+      requireStable
+    ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (offsets, 
exception) =>
+      if (exception != null) {
+        new OffsetFetchResponseData.OffsetFetchResponseGroup()
+          .setGroupId(groupOffsetFetch.groupId)
+          .setErrorCode(Errors.forException(exception).code)
       } else {
-        val (authorizedPartitions, unauthorizedPartitions) = 
partitionByAuthorized(
-          partitions.asScala, context)
-        val (error, authorizedPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId,
-          requireStable, Some(authorizedPartitions))
-        if (error != Errors.NONE) {
-          (error, authorizedPartitionData)
-        } else {
-          val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> 
OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
-          (Errors.NONE, authorizedPartitionData ++ unauthorizedPartitionData)
+        // Clients are not allowed to see offsets for topics that are not 
authorized for Describe.
+        val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized(
+          requestContext,
+          DESCRIBE,
+          TOPIC,
+          offsets.asScala
+        )(_.name)
+
+        new OffsetFetchResponseData.OffsetFetchResponseGroup()
+          .setGroupId(groupOffsetFetch.groupId)
+          .setTopics(authorizedOffsets.asJava)
+      }
+    }
+  }
+
+  private def fetchOffsetsForGroup(
+    requestContext: RequestContext,
+    groupOffsetFetch: OffsetFetchRequestData.OffsetFetchRequestGroup,
+    requireStable: Boolean
+  ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
+    // Clients are not allowed to see offsets for topics that are not 
authorized for Describe.
+    val (authorizedTopics, unauthorizedTopics) = 
authHelper.partitionSeqByAuthorized(
+      requestContext,
+      DESCRIBE,
+      TOPIC,
+      groupOffsetFetch.topics.asScala
+    )(_.name)
+
+    newGroupCoordinator.fetchOffsets(
+      requestContext,
+      groupOffsetFetch.groupId,
+      authorizedTopics.asJava,
+      requireStable
+    ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { 
(topicOffsets, exception) =>
+      if (exception != null) {
+        new OffsetFetchResponseData.OffsetFetchResponseGroup()
+          .setGroupId(groupOffsetFetch.groupId)
+          .setErrorCode(Errors.forException(exception).code)
+      } else {
+        val response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+          .setGroupId(groupOffsetFetch.groupId)
+
+        response.topics.addAll(topicOffsets)
+
+        unauthorizedTopics.foreach { topic =>
+          val topicResponse = new 
OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name)
+          topic.partitionIndexes.forEach { partitionIndex =>
+            topicResponse.partitions.add(new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(partitionIndex)
+              .setCommittedOffset(-1)
+              .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code))
+          }
+          response.topics.add(topicResponse)
         }
+
+        response
       }
     }
   }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 323547474c9..1fbdf333a96 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -18,12 +18,13 @@ package kafka.coordinator.group
 
 import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, 
SyncGroupCallback}
 import kafka.server.RequestLocal
-import org.apache.kafka.common.message.{DeleteGroupsResponseData, 
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, 
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, 
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, 
SyncGroupRequestData, SyncGroupResponseData}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{DeleteGroupsResponseData, 
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, 
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, 
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, 
OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, 
SyncGroupResponseData}
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
 import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, 
RequestHeader}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.BufferSupplier
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
@@ -34,6 +35,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
 import org.mockito.Mockito.{mock, verify, when}
 
 import java.net.InetAddress
+import java.util.Optional
 import scala.jdk.CollectionConverters._
 
 class GroupCoordinatorAdapterTest {
@@ -436,4 +438,174 @@ class GroupCoordinatorAdapterTest {
 
     assertEquals(expectedResults, future.get())
   }
+
+  @Test
+  def testFetchAllOffsets(): Unit = {
+    val foo0 = new TopicPartition("foo", 0)
+    val foo1 = new TopicPartition("foo", 1)
+    val bar1 = new TopicPartition("bar", 1)
+
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+    when(groupCoordinator.handleFetchOffsets(
+      "group",
+      true,
+      None
+    )).thenReturn((
+      Errors.NONE,
+      Map(
+        foo0 -> new OffsetFetchResponse.PartitionData(
+          100,
+          Optional.of(1),
+          "foo",
+          Errors.NONE
+        ),
+        bar1 -> new OffsetFetchResponse.PartitionData(
+          -1,
+          Optional.empty[Integer],
+          "",
+          Errors.UNKNOWN_TOPIC_OR_PARTITION
+        ),
+        foo1 -> new OffsetFetchResponse.PartitionData(
+          200,
+          Optional.empty[Integer],
+          "",
+          Errors.NONE
+        ),
+      )
+    ))
+
+    val ctx = makeContext(ApiKeys.OFFSET_FETCH, 
ApiKeys.OFFSET_FETCH.latestVersion)
+    val future = adapter.fetchAllOffsets(
+      ctx,
+      "group",
+      true
+    )
+
+    assertTrue(future.isDone)
+
+    val expectedResponse = List(
+      new OffsetFetchResponseData.OffsetFetchResponseTopics()
+        .setName(foo0.topic)
+        .setPartitions(List(
+          new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+            .setPartitionIndex(foo0.partition)
+            .setCommittedOffset(100)
+            .setCommittedLeaderEpoch(1)
+            .setMetadata("foo")
+            .setErrorCode(Errors.NONE.code),
+          new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+            .setPartitionIndex(foo1.partition)
+            .setCommittedOffset(200)
+            .setCommittedLeaderEpoch(-1)
+            .setMetadata("")
+            .setErrorCode(Errors.NONE.code),
+        ).asJava),
+      new OffsetFetchResponseData.OffsetFetchResponseTopics()
+        .setName(bar1.topic)
+        .setPartitions(List(
+          new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+            .setPartitionIndex(bar1.partition)
+            .setCommittedOffset(-1)
+            .setCommittedLeaderEpoch(-1)
+            .setMetadata("")
+            .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+        ).asJava)
+    )
+
+    assertEquals(
+      expectedResponse.sortWith(_.name > _.name),
+      future.get().asScala.toList.sortWith(_.name > _.name)
+    )
+  }
+
+  @Test
+  def testFetchOffsets(): Unit = {
+    val foo0 = new TopicPartition("foo", 0)
+    val foo1 = new TopicPartition("foo", 1)
+    val bar1 = new TopicPartition("bar", 1)
+
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+    when(groupCoordinator.handleFetchOffsets(
+      "group",
+      true,
+      Some(Seq(foo0, foo1, bar1))
+    )).thenReturn((
+      Errors.NONE,
+      Map(
+        foo0 -> new OffsetFetchResponse.PartitionData(
+          100,
+          Optional.of(1),
+          "foo",
+          Errors.NONE
+        ),
+        bar1 -> new OffsetFetchResponse.PartitionData(
+          -1,
+          Optional.empty[Integer],
+          "",
+          Errors.UNKNOWN_TOPIC_OR_PARTITION
+        ),
+        foo1 -> new OffsetFetchResponse.PartitionData(
+          200,
+          Optional.empty[Integer],
+          "",
+          Errors.NONE
+        ),
+      )
+    ))
+
+    val ctx = makeContext(ApiKeys.OFFSET_FETCH, 
ApiKeys.OFFSET_FETCH.latestVersion)
+    val future = adapter.fetchOffsets(
+      ctx,
+      "group",
+      List(
+        new OffsetFetchRequestData.OffsetFetchRequestTopics()
+          .setName(foo0.topic)
+          .setPartitionIndexes(List[Integer](foo0.partition, 
foo1.partition).asJava),
+        new OffsetFetchRequestData.OffsetFetchRequestTopics()
+          .setName(bar1.topic)
+          .setPartitionIndexes(List[Integer](bar1.partition).asJava),
+      ).asJava,
+      true
+    )
+
+    assertTrue(future.isDone)
+
+    val expectedResponse = List(
+      new OffsetFetchResponseData.OffsetFetchResponseTopics()
+        .setName(foo0.topic)
+        .setPartitions(List(
+          new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+            .setPartitionIndex(foo0.partition)
+            .setCommittedOffset(100)
+            .setCommittedLeaderEpoch(1)
+            .setMetadata("foo")
+            .setErrorCode(Errors.NONE.code),
+          new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+            .setPartitionIndex(foo1.partition)
+            .setCommittedOffset(200)
+            .setCommittedLeaderEpoch(-1)
+            .setMetadata("")
+            .setErrorCode(Errors.NONE.code),
+        ).asJava),
+      new OffsetFetchResponseData.OffsetFetchResponseTopics()
+        .setName(bar1.topic)
+        .setPartitions(List(
+          new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+            .setPartitionIndex(bar1.partition)
+            .setCommittedOffset(-1)
+            .setCommittedLeaderEpoch(-1)
+            .setMetadata("")
+            .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+        ).asJava)
+    )
+
+    assertEquals(
+      expectedResponse.sortWith(_.name > _.name),
+      future.get().asScala.toList.sortWith(_.name > _.name)
+    )
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f80a4abf35f..27b2338a8e7 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3406,6 +3406,341 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+  def testHandleOffsetFetchWithMultipleGroups(version: Short): Unit = {
+    // Version 0 gets offsets from Zookeeper. We are not interested
+    // in testing this here.
+    if (version == 0) return
+
+    def makeRequest(version: Short): RequestChannel.Request = {
+      val groups = Map(
+        "group-1" -> List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("foo", 1)
+        ).asJava,
+        "group-2" -> null,
+        "group-3" -> null,
+      ).asJava
+      buildRequest(new OffsetFetchRequest.Builder(groups, false, 
false).build(version))
+    }
+
+    if (version < 8) {
+      // Request version earlier than version 8 do not support batching groups.
+      assertThrows(classOf[UnsupportedVersionException], () => 
makeRequest(version))
+    } else {
+      val requestChannelRequest = makeRequest(version)
+
+      val group1Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+      when(newGroupCoordinator.fetchOffsets(
+        requestChannelRequest.context,
+        "group-1",
+        List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+          .setName("foo")
+          .setPartitionIndexes(List[Integer](0, 1).asJava)
+        ).asJava,
+        false
+      )).thenReturn(group1Future)
+
+      val group2Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+      when(newGroupCoordinator.fetchAllOffsets(
+        requestChannelRequest.context,
+        "group-2",
+        false
+      )).thenReturn(group2Future)
+
+      val group3Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+      when(newGroupCoordinator.fetchAllOffsets(
+        requestChannelRequest.context,
+        "group-3",
+        false
+      )).thenReturn(group3Future)
+
+      createKafkaApis().handleOffsetFetchRequest(requestChannelRequest)
+
+      val group1Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId("group-1")
+        .setTopics(List(
+          new OffsetFetchResponseData.OffsetFetchResponseTopics()
+            .setName("foo")
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100)
+                .setCommittedLeaderEpoch(1),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(1)
+                .setCommittedOffset(200)
+                .setCommittedLeaderEpoch(2)
+            ).asJava)
+        ).asJava)
+
+      val group2Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId("group-2")
+        .setTopics(List(
+          new OffsetFetchResponseData.OffsetFetchResponseTopics()
+            .setName("bar")
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100)
+                .setCommittedLeaderEpoch(1),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(1)
+                .setCommittedOffset(200)
+                .setCommittedLeaderEpoch(2),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(2)
+                .setCommittedOffset(300)
+                .setCommittedLeaderEpoch(3)
+            ).asJava)
+        ).asJava)
+
+      val group3Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId("group-3")
+        .setErrorCode(Errors.INVALID_GROUP_ID.code)
+
+      val expectedOffsetFetchResponse = new OffsetFetchResponseData()
+        .setGroups(List(group1Response, group2Response, group3Response).asJava)
+
+      group1Future.complete(group1Response.topics)
+      group2Future.complete(group2Response.topics)
+      group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception)
+
+      val response = 
verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+      assertEquals(expectedOffsetFetchResponse, response.data)
+    }
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+  def testHandleOffsetFetchWithSingleGroup(version: Short): Unit = {
+    // Version 0 gets offsets from Zookeeper. We are not interested
+    // in testing this here.
+    if (version == 0) return
+
+    def makeRequest(version: Short): RequestChannel.Request = {
+      buildRequest(new OffsetFetchRequest.Builder(
+        "group-1",
+        false,
+        List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("foo", 1)
+        ).asJava,
+        false
+      ).build(version))
+    }
+
+    val requestChannelRequest = makeRequest(version)
+
+    val future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+    when(newGroupCoordinator.fetchOffsets(
+      requestChannelRequest.context,
+      "group-1",
+      List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+        .setName("foo")
+        .setPartitionIndexes(List[Integer](0, 1).asJava)
+      ).asJava,
+      false
+    )).thenReturn(future)
+
+    createKafkaApis().handleOffsetFetchRequest(requestChannelRequest)
+
+    val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+      .setGroupId("group-1")
+      .setTopics(List(
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommittedLeaderEpoch(1),
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(1)
+              .setCommittedOffset(200)
+              .setCommittedLeaderEpoch(2)
+          ).asJava)
+      ).asJava)
+
+    val expectedOffsetFetchResponse = if (version >= 8) {
+      new OffsetFetchResponseData()
+        .setGroups(List(group1Response).asJava)
+    } else {
+      new OffsetFetchResponseData()
+        .setTopics(List(
+          new OffsetFetchResponseData.OffsetFetchResponseTopic()
+            .setName("foo")
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartition()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100)
+                .setCommittedLeaderEpoch(if (version >= 5) 1 else -1),
+              new OffsetFetchResponseData.OffsetFetchResponsePartition()
+                .setPartitionIndex(1)
+                .setCommittedOffset(200)
+                .setCommittedLeaderEpoch(if (version >= 5) 2 else -1)
+            ).asJava)
+        ).asJava)
+    }
+
+    future.complete(group1Response.topics)
+
+    val response = 
verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+    assertEquals(expectedOffsetFetchResponse, response.data)
+  }
+
+  @Test
+  def testHandleOffsetFetchAuthorization(): Unit = {
+    def makeRequest(version: Short): RequestChannel.Request = {
+      val groups = Map(
+        "group-1" -> List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("bar", 0)
+        ).asJava,
+        "group-2" -> List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("bar", 0)
+        ).asJava,
+        "group-3" -> null,
+        "group-4" -> null,
+      ).asJava
+      buildRequest(new OffsetFetchRequest.Builder(groups, false, 
false).build(version))
+    }
+
+    val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+
+    val acls = Map(
+      "group-1" -> AuthorizationResult.ALLOWED,
+      "group-2" -> AuthorizationResult.DENIED,
+      "group-3" -> AuthorizationResult.ALLOWED,
+      "group-4" -> AuthorizationResult.DENIED,
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.ALLOWED
+    )
+
+    when(authorizer.authorize(
+      any[RequestContext],
+      any[util.List[Action]]
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument(1, classOf[util.List[Action]])
+      actions.asScala.map { action =>
+        acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+      }.asJava
+    }
+
+    // group-1 is allowed and bar is allowed.
+    val group1Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+    when(newGroupCoordinator.fetchOffsets(
+      requestChannelRequest.context,
+      "group-1",
+      List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+        .setName("bar")
+        .setPartitionIndexes(List[Integer](0).asJava)
+      ).asJava,
+      false
+    )).thenReturn(group1Future)
+
+    // group-3 is allowed and bar is allowed.
+    val group3Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+    when(newGroupCoordinator.fetchAllOffsets(
+      requestChannelRequest.context,
+      "group-3",
+      false
+    )).thenReturn(group3Future)
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val group1ResponseFromCoordinator = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+      .setGroupId("group-1")
+      .setTopics(List(
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setName("bar")
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommittedLeaderEpoch(1)
+          ).asJava)
+      ).asJava)
+
+    val group3ResponseFromCoordinator = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+      .setGroupId("group-3")
+      .setTopics(List(
+        // foo should be filtered out.
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommittedLeaderEpoch(1)
+          ).asJava),
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setName("bar")
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommittedLeaderEpoch(1)
+          ).asJava)
+      ).asJava)
+
+    val expectedOffsetFetchResponse = new OffsetFetchResponseData()
+      .setGroups(List(
+        // group-1 is authorized but foo is not.
+        new OffsetFetchResponseData.OffsetFetchResponseGroup()
+          .setGroupId("group-1")
+          .setTopics(List(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+              .setName("bar")
+              .setPartitions(List(
+                new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                  .setPartitionIndex(0)
+                  .setCommittedOffset(100)
+                  .setCommittedLeaderEpoch(1)
+              ).asJava),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+              .setName("foo")
+              .setPartitions(List(
+                new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                  .setPartitionIndex(0)
+                  .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+                  .setCommittedOffset(-1)
+              ).asJava)
+          ).asJava),
+        // group-2 is not authorized.
+        new OffsetFetchResponseData.OffsetFetchResponseGroup()
+          .setGroupId("group-2")
+          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code),
+        // group-3 is authorized but foo is not.
+        new OffsetFetchResponseData.OffsetFetchResponseGroup()
+          .setGroupId("group-3")
+          .setTopics(List(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+              .setName("bar")
+              .setPartitions(List(
+                new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                  .setPartitionIndex(0)
+                  .setCommittedOffset(100)
+                  .setCommittedLeaderEpoch(1)
+              ).asJava)
+          ).asJava),
+        // group-4 is not authorized.
+        new OffsetFetchResponseData.OffsetFetchResponseGroup()
+          .setGroupId("group-4")
+          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code),
+      ).asJava)
+
+    group1Future.complete(group1ResponseFromCoordinator.topics)
+    group3Future.complete(group3ResponseFromCoordinator.topics)
+
+    val response = 
verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+    assertEquals(expectedOffsetFetchResponse, response.data)
+  }
+
   @Test
   def testReassignmentAndReplicationBytesOutRateWhenReassigning(): Unit = {
     assertReassignmentAndReplicationBytesOutPerSec(true)
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 477f3ebfdd5..c0a4355c63b 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -20,15 +20,14 @@ package kafka.server
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
 import org.apache.kafka.common.TopicPartition
-import 
org.apache.kafka.common.message.OffsetFetchRequestData.{OffsetFetchRequestGroup,
 OffsetFetchRequestTopics}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
 import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, 
OffsetFetchResponse}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+
 import java.util
 import java.util.Collections.singletonList
-
 import scala.jdk.CollectionConverters._
 import java.util.{Optional, Properties}
 
@@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = {
-    createTopic(topics(0))
-    createTopic(topics(1), numPartitions = 2)
-    createTopic(topics(2), numPartitions = 3)
-
-    // create 5 consumers to commit offsets so we can fetch them later
-    val partitionMap = groupToPartitionMap.asScala.map(e => (e._1, 
Option(e._2).getOrElse(allTopicsList)))
-    groups.foreach { groupId =>
-      consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-      commitOffsets(partitionMap(groupId))
-    }
-
-    for (version <- 8 to ApiKeys.OFFSET_FETCH.latestVersion()) {
-      val request = new OffsetFetchRequest.Builder(groupToPartitionMap, false, 
false)
-        .build(version.asInstanceOf[Short])
-      val requestGroups = request.data().groups()
-      requestGroups.add(
-        // add the same group as before with different topic partitions
-        new OffsetFetchRequestGroup()
-          .setGroupId(groups(2))
-          .setTopics(singletonList(
-            new OffsetFetchRequestTopics()
-              .setName(topics(0))
-              .setPartitionIndexes(singletonList(0)))))
-      request.data().setGroups(requestGroups)
-      val response = connectAndReceive[OffsetFetchResponse](request)
-      response.data.groups.asScala.map(_.groupId).foreach( groupId =>
-        if (groupId == "group3") // verify that the response gives back the 
latest changed topic partition list
-          verifyResponse(response.groupLevelError(groupId), 
response.partitionDataMap(groupId), topic1List)
-        else
-          verifyResponse(response.groupLevelError(groupId), 
response.partitionDataMap(groupId), partitionMap(groupId))
-      )
-    }
-  }
-
   private def verifySingleGroupResponse(version: Short,
                                         responseError: Short,
                                         partitionError: Short,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 1faa8a07fac..97bd1902460 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -26,6 +26,8 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.requests.RequestContext;
@@ -132,5 +134,35 @@ public interface GroupCoordinator {
         List<String> groupIds,
         BufferSupplier bufferSupplier
     );
+
+    /**
+     * Fetch offsets for a given Group.
+     *
+     * @param context           The request context.
+     * @param groupId           The group id.
+     * @param topics            The topics to fetch the offsets for.
+     *
+     * @return A future yielding the results or an exception.
+     */
+    CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> 
fetchOffsets(
+        RequestContext context,
+        String groupId,
+        List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
+        boolean requireStable
+    );
+
+    /**
+     * Fetch all offsets for a given Group.
+     *
+     * @param context           The request context.
+     * @param groupId           The group id.
+     *
+     * @return A future yielding the results or an exception.
+     */
+    CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> 
fetchAllOffsets(
+        RequestContext context,
+        String groupId,
+        boolean requireStable
+    );
 }
 

Reply via email to