This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 24a86423e99 KAFKA-14367; Add `OffsetFetch` to the new
`GroupCoordinator` interface (#12870)
24a86423e99 is described below
commit 24a86423e9907b751d98fddc7196332feea2b48d
Author: David Jacot <[email protected]>
AuthorDate: Tue Jan 10 20:38:31 2023 +0100
KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
(#12870)
This patch adds OffsetFetch to the new GroupCoordinator interface and
updates KafkaApis to use it.
Reviewers: Philip Nee <[email protected]>, Jeff Kim <[email protected]>,
Justine Olshan <[email protected]>
---
.../kafka/common/requests/OffsetFetchRequest.java | 18 ++
.../kafka/common/requests/OffsetFetchResponse.java | 60 +++-
.../common/message/OffsetFetchResponse.json | 2 +-
.../group/GroupCoordinatorAdapter.scala | 82 ++++-
core/src/main/scala/kafka/server/KafkaApis.scala | 179 ++++++-----
.../group/GroupCoordinatorAdapterTest.scala | 176 ++++++++++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 335 +++++++++++++++++++++
.../unit/kafka/server/OffsetFetchRequestTest.scala | 39 +--
.../kafka/coordinator/group/GroupCoordinator.java | 32 ++
9 files changed, 799 insertions(+), 124 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index c5c094a1460..edcad541dca 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -227,6 +227,24 @@ public class OffsetFetchRequest extends AbstractRequest {
return data.requireStable();
}
+ public List<OffsetFetchRequestData.OffsetFetchRequestGroup> groups() {
+ if (version() >= 8) {
+ return data.groups();
+ } else {
+ OffsetFetchRequestData.OffsetFetchRequestGroup group =
+ new
OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(data.groupId());
+
+ data.topics().forEach(topic -> {
+ group.topics().add(new OffsetFetchRequestTopics()
+ .setName(topic.name())
+ .setPartitionIndexes(topic.partitionIndexes())
+ );
+ });
+
+ return Collections.singletonList(group);
+ }
+ }
+
public Map<String, List<TopicPartition>> groupIdsToPartitions() {
Map<String, List<TopicPartition>> groupIdsToPartitions = new
HashMap<>();
for (OffsetFetchRequestGroup group : data.groups()) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 2d585a582ae..6941e280916 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
@@ -119,12 +120,6 @@ public class OffsetFetchResponse extends AbstractResponse {
}
}
- public OffsetFetchResponse(OffsetFetchResponseData data) {
- super(ApiKeys.OFFSET_FETCH);
- this.data = data;
- this.error = null;
- }
-
/**
* Constructor without throttle time.
* @param error Potential coordinator or group level error code (for api
version 2 and later)
@@ -208,6 +203,59 @@ public class OffsetFetchResponse extends AbstractResponse {
this.error = null;
}
+ public OffsetFetchResponse(List<OffsetFetchResponseGroup> groups, short
version) {
+ super(ApiKeys.OFFSET_FETCH);
+ data = new OffsetFetchResponseData();
+
+ if (version >= 8) {
+ data.setGroups(groups);
+ error = null;
+
+ for (OffsetFetchResponseGroup group : data.groups()) {
+ this.groupLevelErrors.put(group.groupId(),
Errors.forCode(group.errorCode()));
+ }
+ } else {
+ if (groups.size() != 1) {
+ throw new UnsupportedVersionException(
+ "Version " + version + " of OffsetFetchResponse only
supports one group."
+ );
+ }
+
+ OffsetFetchResponseGroup group = groups.get(0);
+ data.setErrorCode(group.errorCode());
+ error = Errors.forCode(group.errorCode());
+
+ group.topics().forEach(topic -> {
+ OffsetFetchResponseTopic newTopic = new
OffsetFetchResponseTopic().setName(topic.name());
+ data.topics().add(newTopic);
+
+ topic.partitions().forEach(partition -> {
+ OffsetFetchResponsePartition newPartition;
+
+ if (version < 2 && group.errorCode() !=
Errors.NONE.code()) {
+ // Versions prior to version 2 do not support a top
level error. Therefore,
+ // we put it at the partition level.
+ newPartition = new OffsetFetchResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+ .setErrorCode(group.errorCode())
+ .setCommittedOffset(INVALID_OFFSET)
+ .setMetadata(NO_METADATA)
+
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH);
+ } else {
+ newPartition = new OffsetFetchResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+ .setErrorCode(partition.errorCode())
+ .setCommittedOffset(partition.committedOffset())
+ .setMetadata(partition.metadata())
+
.setCommittedLeaderEpoch(partition.committedLeaderEpoch());
+ }
+
+ newTopic.partitions().add(newPartition);
+ });
+ });
+ }
+ }
+
public OffsetFetchResponse(OffsetFetchResponseData data, short version) {
super(ApiKeys.OFFSET_FETCH);
this.data = data;
diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json
b/clients/src/main/resources/common/message/OffsetFetchResponse.json
index dfad60e27d6..71acf0b4d2e 100644
--- a/clients/src/main/resources/common/message/OffsetFetchResponse.json
+++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json
@@ -57,7 +57,7 @@
]},
{ "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0",
"ignorable": true,
"about": "The top-level error code, or 0 if there was no error." },
- {"name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
+ { "name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
"about": "The responses per group id.", "fields": [
{ "name": "groupId", "type": "string", "versions": "8+", "entityType":
"groupId",
"about": "The group ID." },
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 4e96e4373a7..29242a8774a 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -18,13 +18,15 @@ package kafka.coordinator.group
import kafka.server.RequestLocal
import kafka.utils.Implicits.MapExtensionMethods
-import org.apache.kafka.common.message.{DeleteGroupsResponseData,
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData,
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData,
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData,
SyncGroupRequestData, SyncGroupResponseData}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{DeleteGroupsResponseData,
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData,
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData,
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData,
OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData,
SyncGroupResponseData}
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.utils.BufferSupplier
import java.util
import java.util.concurrent.CompletableFuture
-import scala.collection.immutable
+import scala.collection.{immutable, mutable}
import scala.jdk.CollectionConverters._
/**
@@ -234,4 +236,80 @@ class GroupCoordinatorAdapter(
}
CompletableFuture.completedFuture(results)
}
+
+ override def fetchAllOffsets(
+ context: RequestContext,
+ groupId: String,
+ requireStable: Boolean
+ ):
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]
= {
+ handleFetchOffset(
+ groupId,
+ requireStable,
+ None
+ )
+ }
+
+ override def fetchOffsets(
+ context: RequestContext,
+ groupId: String,
+ topics: util.List[OffsetFetchRequestData.OffsetFetchRequestTopics],
+ requireStable: Boolean
+ ):
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]
= {
+ val topicPartitions = new mutable.ArrayBuffer[TopicPartition]()
+ topics.forEach { topic =>
+ topic.partitionIndexes.forEach { partition =>
+ topicPartitions += new TopicPartition(topic.name, partition)
+ }
+ }
+
+ handleFetchOffset(
+ groupId,
+ requireStable,
+ Some(topicPartitions.toSeq)
+ )
+ }
+
+ private def handleFetchOffset(
+ groupId: String,
+ requireStable: Boolean,
+ partitions: Option[Seq[TopicPartition]]
+ ):
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]
= {
+ val (error, results) = coordinator.handleFetchOffsets(
+ groupId,
+ requireStable,
+ partitions
+ )
+
+ val future = new
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+ if (error != Errors.NONE) {
+ future.completeExceptionally(error.exception)
+ } else {
+ val topicsList = new
util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]()
+ val topicsMap = new mutable.HashMap[String,
OffsetFetchResponseData.OffsetFetchResponseTopics]()
+
+ results.forKeyValue { (tp, offset) =>
+ val topic = topicsMap.get(tp.topic) match {
+ case Some(topic) =>
+ topic
+
+ case None =>
+ val topicOffsets = new
OffsetFetchResponseData.OffsetFetchResponseTopics().setName(tp.topic)
+ topicsMap += tp.topic -> topicOffsets
+ topicsList.add(topicOffsets)
+ topicOffsets
+ }
+
+ topic.partitions.add(new
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(tp.partition)
+ .setMetadata(offset.metadata)
+ .setCommittedOffset(offset.offset)
+ .setCommittedLeaderEpoch(offset.leaderEpoch.orElse(-1))
+ .setErrorCode(offset.error.code))
+ }
+
+ future.complete(topicsList)
+ }
+
+ future
+ }
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4ec7a7d9a6c..20cfe766eb2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -57,7 +57,6 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
@@ -189,7 +188,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request,
requestLocal)
case ApiKeys.CONTROLLED_SHUTDOWN =>
handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request,
requestLocal)
- case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
+ case ApiKeys.OFFSET_FETCH =>
handleOffsetFetchRequest(request).exceptionally(handleError)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request,
requestLocal).exceptionally(handleError)
case ApiKeys.HEARTBEAT =>
handleHeartbeatRequest(request).exceptionally(handleError)
@@ -1338,27 +1337,22 @@ class KafkaApis(val requestChannel: RequestChannel,
/**
* Handle an offset fetch request
*/
- def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = {
+ def handleOffsetFetchRequest(request: RequestChannel.Request):
CompletableFuture[Unit] = {
val version = request.header.apiVersion
if (version == 0) {
- // reading offsets from ZK
- handleOffsetFetchRequestV0(request)
- } else if (version >= 1 && version <= 7) {
- // reading offsets from Kafka
- handleOffsetFetchRequestBetweenV1AndV7(request)
+ handleOffsetFetchRequestFromZookeeper(request)
} else {
- // batching offset reads for multiple groups starts with version 8 and
greater
- handleOffsetFetchRequestV8AndAbove(request)
+ handleOffsetFetchRequestFromCoordinator(request)
}
}
- private def handleOffsetFetchRequestV0(request: RequestChannel.Request):
Unit = {
+ private def handleOffsetFetchRequestFromZookeeper(request:
RequestChannel.Request): CompletableFuture[Unit] = {
val header = request.header
val offsetFetchRequest = request.body[OffsetFetchRequest]
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val offsetFetchResponse =
- // reject the request if not authorized to the group
+ // reject the request if not authorized to the group
if (!authHelper.authorize(request.context, DESCRIBE, GROUP,
offsetFetchRequest.groupId))
offsetFetchRequest.getErrorResponse(requestThrottleMs,
Errors.GROUP_AUTHORIZATION_FAILED)
else {
@@ -1395,79 +1389,114 @@ class KafkaApis(val requestChannel: RequestChannel,
offsetFetchResponse
}
requestHelper.sendResponseMaybeThrottle(request, createResponse)
+ CompletableFuture.completedFuture[Unit](())
}
- private def handleOffsetFetchRequestBetweenV1AndV7(request:
RequestChannel.Request): Unit = {
- val header = request.header
+ private def handleOffsetFetchRequestFromCoordinator(request:
RequestChannel.Request): CompletableFuture[Unit] = {
val offsetFetchRequest = request.body[OffsetFetchRequest]
- val groupId = offsetFetchRequest.groupId()
- val (error, partitionData) = fetchOffsets(groupId,
offsetFetchRequest.isAllPartitions,
- offsetFetchRequest.requireStable, offsetFetchRequest.partitions,
request.context)
- def createResponse(requestThrottleMs: Int): AbstractResponse = {
- val offsetFetchResponse =
- if (error != Errors.NONE) {
- offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
- } else {
- new OffsetFetchResponse(requestThrottleMs, Errors.NONE,
partitionData.asJava)
- }
- trace(s"Sending offset fetch response $offsetFetchResponse for
correlation id ${header.correlationId} to client ${header.clientId}.")
- offsetFetchResponse
+ val groups = offsetFetchRequest.groups()
+ val requireStable = offsetFetchRequest.requireStable()
+
+ val futures = new
mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size)
+ groups.forEach { groupOffsetFetch =>
+ val isAllPartitions = groupOffsetFetch.topics == null
+ if (!authHelper.authorize(request.context, DESCRIBE, GROUP,
groupOffsetFetch.groupId)) {
+ futures += CompletableFuture.completedFuture(new
OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(groupOffsetFetch.groupId)
+ .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
+ } else if (isAllPartitions) {
+ futures += fetchAllOffsetsForGroup(
+ request.context,
+ groupOffsetFetch,
+ requireStable
+ )
+ } else {
+ futures += fetchOffsetsForGroup(
+ request.context,
+ groupOffsetFetch,
+ requireStable
+ )
+ }
}
- requestHelper.sendResponseMaybeThrottle(request, createResponse)
- }
- private def handleOffsetFetchRequestV8AndAbove(request:
RequestChannel.Request): Unit = {
- val header = request.header
- val offsetFetchRequest = request.body[OffsetFetchRequest]
- val groupIds = offsetFetchRequest.groupIds().asScala
- val groupToErrorMap = mutable.Map.empty[String, Errors]
- val groupToPartitionData = mutable.Map.empty[String,
util.Map[TopicPartition, PartitionData]]
- val groupToTopicPartitions = offsetFetchRequest.groupIdsToPartitions()
- groupIds.foreach(g => {
- val (error, partitionData) = fetchOffsets(g,
- offsetFetchRequest.isAllPartitionsForGroup(g),
- offsetFetchRequest.requireStable(),
- groupToTopicPartitions.get(g), request.context)
- groupToErrorMap += (g -> error)
- groupToPartitionData += (g -> partitionData.asJava)
- })
-
- def createResponse(requestThrottleMs: Int): AbstractResponse = {
- val offsetFetchResponse = new OffsetFetchResponse(requestThrottleMs,
- groupToErrorMap.asJava, groupToPartitionData.asJava)
- trace(s"Sending offset fetch response $offsetFetchResponse for
correlation id ${header.correlationId} to client ${header.clientId}.")
- offsetFetchResponse
+ CompletableFuture.allOf(futures.toArray: _*).handle[Unit] { (_, _) =>
+ val groupResponses = new
ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseGroup](futures.size)
+ futures.foreach(future => groupResponses += future.get())
+ requestHelper.sendMaybeThrottle(request, new
OffsetFetchResponse(groupResponses.asJava, request.context.apiVersion))
}
-
- requestHelper.sendResponseMaybeThrottle(request, createResponse)
}
- private def fetchOffsets(groupId: String, isAllPartitions: Boolean,
requireStable: Boolean,
- partitions: util.List[TopicPartition], context:
RequestContext): (Errors, Map[TopicPartition,
OffsetFetchResponse.PartitionData]) = {
- if (!authHelper.authorize(context, DESCRIBE, GROUP, groupId)) {
- (Errors.GROUP_AUTHORIZATION_FAILED, Map.empty)
- } else {
- if (isAllPartitions) {
- val (error, allPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable)
- if (error != Errors.NONE) {
- (error, allPartitionData)
- } else {
- // clients are not allowed to see offsets for topics that are not
authorized for Describe
- val (authorizedPartitionData, _) =
authHelper.partitionMapByAuthorized(context,
- DESCRIBE, TOPIC, allPartitionData)(_.topic)
- (Errors.NONE, authorizedPartitionData)
- }
+ private def fetchAllOffsetsForGroup(
+ requestContext: RequestContext,
+ groupOffsetFetch: OffsetFetchRequestData.OffsetFetchRequestGroup,
+ requireStable: Boolean
+ ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
+ newGroupCoordinator.fetchAllOffsets(
+ requestContext,
+ groupOffsetFetch.groupId,
+ requireStable
+ ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { (offsets,
exception) =>
+ if (exception != null) {
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(groupOffsetFetch.groupId)
+ .setErrorCode(Errors.forException(exception).code)
} else {
- val (authorizedPartitions, unauthorizedPartitions) =
partitionByAuthorized(
- partitions.asScala, context)
- val (error, authorizedPartitionData) =
groupCoordinator.handleFetchOffsets(groupId,
- requireStable, Some(authorizedPartitions))
- if (error != Errors.NONE) {
- (error, authorizedPartitionData)
- } else {
- val unauthorizedPartitionData = unauthorizedPartitions.map(_ ->
OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
- (Errors.NONE, authorizedPartitionData ++ unauthorizedPartitionData)
+ // Clients are not allowed to see offsets for topics that are not
authorized for Describe.
+ val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized(
+ requestContext,
+ DESCRIBE,
+ TOPIC,
+ offsets.asScala
+ )(_.name)
+
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(groupOffsetFetch.groupId)
+ .setTopics(authorizedOffsets.asJava)
+ }
+ }
+ }
+
+ private def fetchOffsetsForGroup(
+ requestContext: RequestContext,
+ groupOffsetFetch: OffsetFetchRequestData.OffsetFetchRequestGroup,
+ requireStable: Boolean
+ ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
+ // Clients are not allowed to see offsets for topics that are not
authorized for Describe.
+ val (authorizedTopics, unauthorizedTopics) =
authHelper.partitionSeqByAuthorized(
+ requestContext,
+ DESCRIBE,
+ TOPIC,
+ groupOffsetFetch.topics.asScala
+ )(_.name)
+
+ newGroupCoordinator.fetchOffsets(
+ requestContext,
+ groupOffsetFetch.groupId,
+ authorizedTopics.asJava,
+ requireStable
+ ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] {
(topicOffsets, exception) =>
+ if (exception != null) {
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(groupOffsetFetch.groupId)
+ .setErrorCode(Errors.forException(exception).code)
+ } else {
+ val response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(groupOffsetFetch.groupId)
+
+ response.topics.addAll(topicOffsets)
+
+ unauthorizedTopics.foreach { topic =>
+ val topicResponse = new
OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name)
+ topic.partitionIndexes.forEach { partitionIndex =>
+ topicResponse.partitions.add(new
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(partitionIndex)
+ .setCommittedOffset(-1)
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code))
+ }
+ response.topics.add(topicResponse)
}
+
+ response
}
}
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 323547474c9..1fbdf333a96 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -18,12 +18,13 @@ package kafka.coordinator.group
import
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback,
SyncGroupCallback}
import kafka.server.RequestLocal
-import org.apache.kafka.common.message.{DeleteGroupsResponseData,
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData,
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData,
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData,
SyncGroupRequestData, SyncGroupResponseData}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{DeleteGroupsResponseData,
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData,
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData,
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData,
OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData,
SyncGroupResponseData}
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext,
RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.BufferSupplier
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
@@ -34,6 +35,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify, when}
import java.net.InetAddress
+import java.util.Optional
import scala.jdk.CollectionConverters._
class GroupCoordinatorAdapterTest {
@@ -436,4 +438,174 @@ class GroupCoordinatorAdapterTest {
assertEquals(expectedResults, future.get())
}
+
+ @Test
+ def testFetchAllOffsets(): Unit = {
+ val foo0 = new TopicPartition("foo", 0)
+ val foo1 = new TopicPartition("foo", 1)
+ val bar1 = new TopicPartition("bar", 1)
+
+ val groupCoordinator = mock(classOf[GroupCoordinator])
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+ when(groupCoordinator.handleFetchOffsets(
+ "group",
+ true,
+ None
+ )).thenReturn((
+ Errors.NONE,
+ Map(
+ foo0 -> new OffsetFetchResponse.PartitionData(
+ 100,
+ Optional.of(1),
+ "foo",
+ Errors.NONE
+ ),
+ bar1 -> new OffsetFetchResponse.PartitionData(
+ -1,
+ Optional.empty[Integer],
+ "",
+ Errors.UNKNOWN_TOPIC_OR_PARTITION
+ ),
+ foo1 -> new OffsetFetchResponse.PartitionData(
+ 200,
+ Optional.empty[Integer],
+ "",
+ Errors.NONE
+ ),
+ )
+ ))
+
+ val ctx = makeContext(ApiKeys.OFFSET_FETCH,
ApiKeys.OFFSET_FETCH.latestVersion)
+ val future = adapter.fetchAllOffsets(
+ ctx,
+ "group",
+ true
+ )
+
+ assertTrue(future.isDone)
+
+ val expectedResponse = List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName(foo0.topic)
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(foo0.partition)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(1)
+ .setMetadata("foo")
+ .setErrorCode(Errors.NONE.code),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(foo1.partition)
+ .setCommittedOffset(200)
+ .setCommittedLeaderEpoch(-1)
+ .setMetadata("")
+ .setErrorCode(Errors.NONE.code),
+ ).asJava),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName(bar1.topic)
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(bar1.partition)
+ .setCommittedOffset(-1)
+ .setCommittedLeaderEpoch(-1)
+ .setMetadata("")
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+ ).asJava)
+ )
+
+ assertEquals(
+ expectedResponse.sortWith(_.name > _.name),
+ future.get().asScala.toList.sortWith(_.name > _.name)
+ )
+ }
+
+ @Test
+ def testFetchOffsets(): Unit = {
+ val foo0 = new TopicPartition("foo", 0)
+ val foo1 = new TopicPartition("foo", 1)
+ val bar1 = new TopicPartition("bar", 1)
+
+ val groupCoordinator = mock(classOf[GroupCoordinator])
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+ when(groupCoordinator.handleFetchOffsets(
+ "group",
+ true,
+ Some(Seq(foo0, foo1, bar1))
+ )).thenReturn((
+ Errors.NONE,
+ Map(
+ foo0 -> new OffsetFetchResponse.PartitionData(
+ 100,
+ Optional.of(1),
+ "foo",
+ Errors.NONE
+ ),
+ bar1 -> new OffsetFetchResponse.PartitionData(
+ -1,
+ Optional.empty[Integer],
+ "",
+ Errors.UNKNOWN_TOPIC_OR_PARTITION
+ ),
+ foo1 -> new OffsetFetchResponse.PartitionData(
+ 200,
+ Optional.empty[Integer],
+ "",
+ Errors.NONE
+ ),
+ )
+ ))
+
+ val ctx = makeContext(ApiKeys.OFFSET_FETCH,
ApiKeys.OFFSET_FETCH.latestVersion)
+ val future = adapter.fetchOffsets(
+ ctx,
+ "group",
+ List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName(foo0.topic)
+ .setPartitionIndexes(List[Integer](foo0.partition,
foo1.partition).asJava),
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName(bar1.topic)
+ .setPartitionIndexes(List[Integer](bar1.partition).asJava),
+ ).asJava,
+ true
+ )
+
+ assertTrue(future.isDone)
+
+ val expectedResponse = List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName(foo0.topic)
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(foo0.partition)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(1)
+ .setMetadata("foo")
+ .setErrorCode(Errors.NONE.code),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(foo1.partition)
+ .setCommittedOffset(200)
+ .setCommittedLeaderEpoch(-1)
+ .setMetadata("")
+ .setErrorCode(Errors.NONE.code),
+ ).asJava),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName(bar1.topic)
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(bar1.partition)
+ .setCommittedOffset(-1)
+ .setCommittedLeaderEpoch(-1)
+ .setMetadata("")
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+ ).asJava)
+ )
+
+ assertEquals(
+ expectedResponse.sortWith(_.name > _.name),
+ future.get().asScala.toList.sortWith(_.name > _.name)
+ )
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f80a4abf35f..27b2338a8e7 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3406,6 +3406,341 @@ class KafkaApisTest {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+ def testHandleOffsetFetchWithMultipleGroups(version: Short): Unit = {
+ // Version 0 gets offsets from Zookeeper. We are not interested
+ // in testing this here.
+ if (version == 0) return
+
+ def makeRequest(version: Short): RequestChannel.Request = {
+ val groups = Map(
+ "group-1" -> List(
+ new TopicPartition("foo", 0),
+ new TopicPartition("foo", 1)
+ ).asJava,
+ "group-2" -> null,
+ "group-3" -> null,
+ ).asJava
+ buildRequest(new OffsetFetchRequest.Builder(groups, false,
false).build(version))
+ }
+
+ if (version < 8) {
+ // Request version earlier than version 8 do not support batching groups.
+ assertThrows(classOf[UnsupportedVersionException], () =>
makeRequest(version))
+ } else {
+ val requestChannelRequest = makeRequest(version)
+
+ val group1Future = new
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+ when(newGroupCoordinator.fetchOffsets(
+ requestChannelRequest.context,
+ "group-1",
+ List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List[Integer](0, 1).asJava)
+ ).asJava,
+ false
+ )).thenReturn(group1Future)
+
+ val group2Future = new
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+ when(newGroupCoordinator.fetchAllOffsets(
+ requestChannelRequest.context,
+ "group-2",
+ false
+ )).thenReturn(group2Future)
+
+ val group3Future = new
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+ when(newGroupCoordinator.fetchAllOffsets(
+ requestChannelRequest.context,
+ "group-3",
+ false
+ )).thenReturn(group3Future)
+
+ createKafkaApis().handleOffsetFetchRequest(requestChannelRequest)
+
+ val group1Response = new
OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-1")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(1),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(200)
+ .setCommittedLeaderEpoch(2)
+ ).asJava)
+ ).asJava)
+
+ val group2Response = new
OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-2")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(1),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(200)
+ .setCommittedLeaderEpoch(2),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(2)
+ .setCommittedOffset(300)
+ .setCommittedLeaderEpoch(3)
+ ).asJava)
+ ).asJava)
+
+ val group3Response = new
OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-3")
+ .setErrorCode(Errors.INVALID_GROUP_ID.code)
+
+ val expectedOffsetFetchResponse = new OffsetFetchResponseData()
+ .setGroups(List(group1Response, group2Response, group3Response).asJava)
+
+ group1Future.complete(group1Response.topics)
+ group2Future.complete(group2Response.topics)
+ group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception)
+
+ val response =
verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+ assertEquals(expectedOffsetFetchResponse, response.data)
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+ def testHandleOffsetFetchWithSingleGroup(version: Short): Unit = {
+ // Version 0 gets offsets from Zookeeper. We are not interested
+ // in testing this here.
+ if (version == 0) return
+
+ def makeRequest(version: Short): RequestChannel.Request = {
+ buildRequest(new OffsetFetchRequest.Builder(
+ "group-1",
+ false,
+ List(
+ new TopicPartition("foo", 0),
+ new TopicPartition("foo", 1)
+ ).asJava,
+ false
+ ).build(version))
+ }
+
+ val requestChannelRequest = makeRequest(version)
+
+ val future = new
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+ when(newGroupCoordinator.fetchOffsets(
+ requestChannelRequest.context,
+ "group-1",
+ List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List[Integer](0, 1).asJava)
+ ).asJava,
+ false
+ )).thenReturn(future)
+
+ createKafkaApis().handleOffsetFetchRequest(requestChannelRequest)
+
+ val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-1")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(1),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(200)
+ .setCommittedLeaderEpoch(2)
+ ).asJava)
+ ).asJava)
+
+ val expectedOffsetFetchResponse = if (version >= 8) {
+ new OffsetFetchResponseData()
+ .setGroups(List(group1Response).asJava)
+ } else {
+ new OffsetFetchResponseData()
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopic()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(if (version >= 5) 1 else -1),
+ new OffsetFetchResponseData.OffsetFetchResponsePartition()
+ .setPartitionIndex(1)
+ .setCommittedOffset(200)
+ .setCommittedLeaderEpoch(if (version >= 5) 2 else -1)
+ ).asJava)
+ ).asJava)
+ }
+
+ future.complete(group1Response.topics)
+
+ val response =
verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+ assertEquals(expectedOffsetFetchResponse, response.data)
+ }
+
+ @Test
+ def testHandleOffsetFetchAuthorization(): Unit = {
+ def makeRequest(version: Short): RequestChannel.Request = {
+ val groups = Map(
+ "group-1" -> List(
+ new TopicPartition("foo", 0),
+ new TopicPartition("bar", 0)
+ ).asJava,
+ "group-2" -> List(
+ new TopicPartition("foo", 0),
+ new TopicPartition("bar", 0)
+ ).asJava,
+ "group-3" -> null,
+ "group-4" -> null,
+ ).asJava
+ buildRequest(new OffsetFetchRequest.Builder(groups, false,
false).build(version))
+ }
+
+ val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+
+ val acls = Map(
+ "group-1" -> AuthorizationResult.ALLOWED,
+ "group-2" -> AuthorizationResult.DENIED,
+ "group-3" -> AuthorizationResult.ALLOWED,
+ "group-4" -> AuthorizationResult.DENIED,
+ "foo" -> AuthorizationResult.DENIED,
+ "bar" -> AuthorizationResult.ALLOWED
+ )
+
+ when(authorizer.authorize(
+ any[RequestContext],
+ any[util.List[Action]]
+ )).thenAnswer { invocation =>
+ val actions = invocation.getArgument(1, classOf[util.List[Action]])
+ actions.asScala.map { action =>
+ acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+ }.asJava
+ }
+
+ // group-1 is allowed and bar is allowed.
+ val group1Future = new
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+ when(newGroupCoordinator.fetchOffsets(
+ requestChannelRequest.context,
+ "group-1",
+ List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(List[Integer](0).asJava)
+ ).asJava,
+ false
+ )).thenReturn(group1Future)
+
+ // group-3 is allowed and bar is allowed.
+ val group3Future = new
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+ when(newGroupCoordinator.fetchAllOffsets(
+ requestChannelRequest.context,
+ "group-3",
+ false
+ )).thenReturn(group3Future)
+
+ createKafkaApis(authorizer =
Some(authorizer)).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+ val group1ResponseFromCoordinator = new
OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-1")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(1)
+ ).asJava)
+ ).asJava)
+
+ val group3ResponseFromCoordinator = new
OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-3")
+ .setTopics(List(
+ // foo should be filtered out.
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(1)
+ ).asJava),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(1)
+ ).asJava)
+ ).asJava)
+
+ val expectedOffsetFetchResponse = new OffsetFetchResponseData()
+ .setGroups(List(
+ // group-1 is authorized but foo is not.
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-1")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(1)
+ ).asJava),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ .setCommittedOffset(-1)
+ ).asJava)
+ ).asJava),
+ // group-2 is not authorized.
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-2")
+ .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code),
+ // group-3 is authorized but foo is not.
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-3")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(1)
+ ).asJava)
+ ).asJava),
+ // group-4 is not authorized.
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-4")
+ .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code),
+ ).asJava)
+
+ group1Future.complete(group1ResponseFromCoordinator.topics)
+ group3Future.complete(group3ResponseFromCoordinator.topics)
+
+ val response =
verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+ assertEquals(expectedOffsetFetchResponse, response.data)
+ }
+
@Test
def testReassignmentAndReplicationBytesOutRateWhenReassigning(): Unit = {
assertReassignmentAndReplicationBytesOutPerSec(true)
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 477f3ebfdd5..c0a4355c63b 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -20,15 +20,14 @@ package kafka.server
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
-import
org.apache.kafka.common.message.OffsetFetchRequestData.{OffsetFetchRequestGroup,
OffsetFetchRequestTopics}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest,
OffsetFetchResponse}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+
import java.util
import java.util.Collections.singletonList
-
import scala.jdk.CollectionConverters._
import java.util.{Optional, Properties}
@@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest {
}
}
- @Test
- def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = {
- createTopic(topics(0))
- createTopic(topics(1), numPartitions = 2)
- createTopic(topics(2), numPartitions = 3)
-
- // create 5 consumers to commit offsets so we can fetch them later
- val partitionMap = groupToPartitionMap.asScala.map(e => (e._1,
Option(e._2).getOrElse(allTopicsList)))
- groups.foreach { groupId =>
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
- commitOffsets(partitionMap(groupId))
- }
-
- for (version <- 8 to ApiKeys.OFFSET_FETCH.latestVersion()) {
- val request = new OffsetFetchRequest.Builder(groupToPartitionMap, false,
false)
- .build(version.asInstanceOf[Short])
- val requestGroups = request.data().groups()
- requestGroups.add(
- // add the same group as before with different topic partitions
- new OffsetFetchRequestGroup()
- .setGroupId(groups(2))
- .setTopics(singletonList(
- new OffsetFetchRequestTopics()
- .setName(topics(0))
- .setPartitionIndexes(singletonList(0)))))
- request.data().setGroups(requestGroups)
- val response = connectAndReceive[OffsetFetchResponse](request)
- response.data.groups.asScala.map(_.groupId).foreach( groupId =>
- if (groupId == "group3") // verify that the response gives back the
latest changed topic partition list
- verifyResponse(response.groupLevelError(groupId),
response.partitionDataMap(groupId), topic1List)
- else
- verifyResponse(response.groupLevelError(groupId),
response.partitionDataMap(groupId), partitionMap(groupId))
- )
- }
- }
-
private def verifySingleGroupResponse(version: Short,
responseError: Short,
partitionError: Short,
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 1faa8a07fac..97bd1902460 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -26,6 +26,8 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.requests.RequestContext;
@@ -132,5 +134,35 @@ public interface GroupCoordinator {
List<String> groupIds,
BufferSupplier bufferSupplier
);
+
+ /**
+ * Fetch offsets for a given Group.
+ *
+ * @param context The request context.
+ * @param groupId The group id.
+ * @param topics The topics to fetch the offsets for.
+ *
+ * @return A future yielding the results or an exception.
+ */
+ CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>>
fetchOffsets(
+ RequestContext context,
+ String groupId,
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
+ boolean requireStable
+ );
+
+ /**
+ * Fetch all offsets for a given Group.
+ *
+ * @param context The request context.
+ * @param groupId The group id.
+ *
+ * @return A future yielding the results or an exception.
+ */
+ CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>>
fetchAllOffsets(
+ RequestContext context,
+ String groupId,
+ boolean requireStable
+ );
}