This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6123071fc05 KAFKA-14499: [7/7] Add integration tests for OffsetCommit
API and OffsetFetch API (#14353)
6123071fc05 is described below
commit 6123071fc05f714f8781edac6686c9c4e98fdd35
Author: David Jacot <[email protected]>
AuthorDate: Mon Sep 11 10:48:02 2023 -0700
KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and
OffsetFetch API (#14353)
This patch adds integration tests for the OffsetCommit API and the
OffsetFetch API. The tests runs against the old and the new group coordinator
and with the new and the old consumer rebalance protocol.
Reviewers: Ritika Reddy <[email protected]>, Justine Olshan
<[email protected]>
---
.../kafka/common/requests/OffsetCommitRequest.java | 8 +-
.../kafka/common/requests/OffsetFetchRequest.java | 78 ++-
.../common/message/OffsetFetchRequest.json | 2 +-
.../common/requests/OffsetFetchRequestTest.java | 1 -
.../kafka/common/requests/RequestResponseTest.java | 20 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +
.../test/junit/ZkClusterInvocationContext.java | 2 +-
.../server/GroupCoordinatorBaseRequestTest.scala | 283 +++++++++
.../kafka/server/OffsetCommitRequestTest.scala | 167 ++++++
.../unit/kafka/server/OffsetFetchRequestTest.scala | 661 ++++++++++++++-------
10 files changed, 968 insertions(+), 256 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index b4307909719..0eba5f29ab4 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -46,11 +46,15 @@ public class OffsetCommitRequest extends AbstractRequest {
private final OffsetCommitRequestData data;
- public Builder(OffsetCommitRequestData data) {
- super(ApiKeys.OFFSET_COMMIT);
+ public Builder(OffsetCommitRequestData data, boolean
enableUnstableLastVersion) {
+ super(ApiKeys.OFFSET_COMMIT, enableUnstableLastVersion);
this.data = data;
}
+ public Builder(OffsetCommitRequestData data) {
+ this(data, false);
+ }
+
@Override
public OffsetCommitRequest build(short version) {
if (data.groupInstanceId() != null && version < 7) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 1321612d30a..70473565f63 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -55,35 +55,51 @@ public class OffsetFetchRequest extends AbstractRequest {
boolean requireStable,
List<TopicPartition> partitions,
boolean throwOnFetchStableOffsetsUnsupported) {
+ this(
+ groupId,
+ null,
+ -1,
+ requireStable,
+ partitions,
+ throwOnFetchStableOffsetsUnsupported
+ );
+ }
+
+ public Builder(String groupId,
+ String memberId,
+ int memberEpoch,
+ boolean requireStable,
+ List<TopicPartition> partitions,
+ boolean throwOnFetchStableOffsetsUnsupported) {
super(ApiKeys.OFFSET_FETCH);
- final List<OffsetFetchRequestTopic> topics;
+ OffsetFetchRequestData.OffsetFetchRequestGroup group =
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(memberEpoch);
+
if (partitions != null) {
- Map<String, OffsetFetchRequestTopic>
offsetFetchRequestTopicMap = new HashMap<>();
+ Map<String, OffsetFetchRequestTopics>
offsetFetchRequestTopicMap = new HashMap<>();
for (TopicPartition topicPartition : partitions) {
String topicName = topicPartition.topic();
- OffsetFetchRequestTopic topic =
offsetFetchRequestTopicMap.getOrDefault(
- topicName, new
OffsetFetchRequestTopic().setName(topicName));
+ OffsetFetchRequestTopics topic =
offsetFetchRequestTopicMap.getOrDefault(
+ topicName, new
OffsetFetchRequestTopics().setName(topicName));
topic.partitionIndexes().add(topicPartition.partition());
offsetFetchRequestTopicMap.put(topicName, topic);
}
- topics = new ArrayList<>(offsetFetchRequestTopicMap.values());
+ group.setTopics(new
ArrayList<>(offsetFetchRequestTopicMap.values()));
} else {
// If passed in partition list is null, it is requesting
offsets for all topic partitions.
- topics = ALL_TOPIC_PARTITIONS;
+ group.setTopics(ALL_TOPIC_PARTITIONS_BATCH);
}
this.data = new OffsetFetchRequestData()
- .setGroupId(groupId)
- .setRequireStable(requireStable)
- .setTopics(topics);
+ .setRequireStable(requireStable)
+ .setGroups(Collections.singletonList(group));
this.throwOnFetchStableOffsetsUnsupported =
throwOnFetchStableOffsetsUnsupported;
}
- boolean isAllTopicPartitions() {
- return this.data.topics() == ALL_TOPIC_PARTITIONS;
- }
-
public Builder(Map<String, List<TopicPartition>>
groupIdToTopicPartitionMap,
boolean requireStable,
boolean throwOnFetchStableOffsetsUnsupported) {
@@ -120,10 +136,6 @@ public class OffsetFetchRequest extends AbstractRequest {
@Override
public OffsetFetchRequest build(short version) {
- if (isAllTopicPartitions() && version < 2) {
- throw new UnsupportedVersionException("The broker only
supports OffsetFetchRequest " +
- "v" + version + ", but we need v2 or newer to request all
topic partitions.");
- }
if (data.groups().size() > 1 && version < 8) {
throw new NoBatchedOffsetFetchRequestException("Broker does
not support"
+ " batching groups for fetch offset request on version "
+ version);
@@ -141,7 +153,7 @@ public class OffsetFetchRequest extends AbstractRequest {
}
// convert data to use the appropriate version since version 8
uses different format
if (version < 8) {
- OffsetFetchRequestData oldDataFormat = null;
+ OffsetFetchRequestData normalizedData;
if (!data.groups().isEmpty()) {
OffsetFetchRequestGroup group = data.groups().get(0);
String groupName = group.groupId();
@@ -156,34 +168,18 @@ public class OffsetFetchRequest extends AbstractRequest {
.setPartitionIndexes(t.partitionIndexes()))
.collect(Collectors.toList());
}
- oldDataFormat = new OffsetFetchRequestData()
+ normalizedData = new OffsetFetchRequestData()
.setGroupId(groupName)
.setTopics(oldFormatTopics)
.setRequireStable(data.requireStable());
+ } else {
+ normalizedData = data;
}
- return new OffsetFetchRequest(oldDataFormat == null ? data :
oldDataFormat, version);
- } else {
- if (data.groups().isEmpty()) {
- String groupName = data.groupId();
- List<OffsetFetchRequestTopic> oldFormatTopics =
data.topics();
- List<OffsetFetchRequestTopics> topics = null;
- if (oldFormatTopics != null) {
- topics = oldFormatTopics
- .stream()
- .map(t -> new OffsetFetchRequestTopics()
- .setName(t.name())
- .setPartitionIndexes(t.partitionIndexes()))
- .collect(Collectors.toList());
- }
- OffsetFetchRequestData convertedDataFormat =
- new OffsetFetchRequestData()
- .setGroups(Collections.singletonList(
- new OffsetFetchRequestGroup()
- .setGroupId(groupName)
- .setTopics(topics)))
- .setRequireStable(data.requireStable());
- return new OffsetFetchRequest(convertedDataFormat,
version);
+ if (normalizedData.topics() == null && version < 2) {
+ throw new UnsupportedVersionException("The broker only
supports OffsetFetchRequest " +
+ "v" + version + ", but we need v2 or newer to request
all topic partitions.");
}
+ return new OffsetFetchRequest(normalizedData, version);
}
return new OffsetFetchRequest(data, version);
}
diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json
b/clients/src/main/resources/common/message/OffsetFetchRequest.json
index b0f564e7764..33dab5c957c 100644
--- a/clients/src/main/resources/common/message/OffsetFetchRequest.json
+++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json
@@ -56,7 +56,7 @@
"about": "Each group we would like to fetch offsets for", "fields": [
{ "name": "GroupId", "type": "string", "versions": "8+", "entityType":
"groupId",
"about": "The group ID."},
- { "name": "MemberId", "type": "string", "versions": "9+",
"nullableVersions": "9+", "default": null, "ignorable": true,
+ { "name": "MemberId", "type": "string", "versions": "9+",
"nullableVersions": "9+", "default": "null", "ignorable": true,
"about": "The member ID assigned by the group coordinator if using the
new consumer protocol (KIP-848)." },
{ "name": "MemberEpoch", "type": "int32", "versions": "9+", "default":
"-1", "ignorable": true,
"about": "The member epoch if using the new consumer protocol
(KIP-848)." },
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
index 37076d00e7f..6ce7dd93fb9 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
@@ -83,7 +83,6 @@ public class OffsetFetchRequestTest {
false,
partitions,
false);
- assertFalse(builder.isAllTopicPartitions());
OffsetFetchRequest request = builder.build(version);
assertFalse(request.isAllPartitions());
assertEquals(group1, request.groupId());
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c70b592c628..36bf5c45d73 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -763,20 +763,22 @@ public class RequestResponseTest {
public void testOffsetFetchRequestBuilderToStringV0ToV7() {
List<Boolean> stableFlags = asList(true, false);
for (Boolean requireStable : stableFlags) {
- String allTopicPartitionsString = new
OffsetFetchRequest.Builder("someGroup",
+ String allTopicPartitionsString = new OffsetFetchRequest.Builder(
+ "someGroup",
requireStable,
null,
- false)
- .toString();
+ false
+ ).toString();
- assertTrue(allTopicPartitionsString.contains("groupId='someGroup',
topics=null,"
- + " groups=[], requireStable=" + requireStable));
- String string = new OffsetFetchRequest.Builder("group1",
+ assertTrue(allTopicPartitionsString.contains("groupId='',
topics=[],"
+ + " groups=[OffsetFetchRequestGroup(groupId='someGroup',
memberId=null, memberEpoch=-1, topics=null)], requireStable=" + requireStable));
+ String string = new OffsetFetchRequest.Builder(
+ "group1",
requireStable,
singletonList(
new TopicPartition("test11", 1)),
- false)
- .toString();
+ false
+ ).toString();
assertTrue(string.contains("test11"));
assertTrue(string.contains("group1"));
assertTrue(string.contains("requireStable=" + requireStable));
@@ -792,7 +794,7 @@ public class RequestResponseTest {
false
).toString();
assertTrue(allTopicPartitionsString.contains("groups=[OffsetFetchRequestGroup"
- + "(groupId='someGroup', memberId='', memberEpoch=-1,
topics=null)], requireStable=" + requireStable));
+ + "(groupId='someGroup', memberId=null, memberEpoch=-1,
topics=null)], requireStable=" + requireStable));
String subsetTopicPartitionsString = new OffsetFetchRequest.Builder(
Collections.singletonMap(
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index b588f83e869..5c8091dccfc 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1495,6 +1495,8 @@ class KafkaApis(val requestChannel: RequestChannel,
requestContext,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId(offsetFetchRequest.groupId)
+ .setMemberId(offsetFetchRequest.memberId)
+ .setMemberEpoch(offsetFetchRequest.memberEpoch)
.setTopics(authorizedTopics.asJava),
requireStable
).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] {
(offsetFetchResponse, exception) =>
diff --git
a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index 26c51d70197..c196cccad40 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -338,7 +338,7 @@ public class ZkClusterInvocationContext implements
TestTemplateInvocationContext
.orElseThrow(() -> new IllegalArgumentException("Unknown
brokerId " + brokerId));
}
- private Stream<KafkaServer> servers() {
+ public Stream<KafkaServer> servers() {
return
JavaConverters.asJavaCollection(clusterReference.get().servers()).stream();
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
new file mode 100644
index 00000000000..b996cf82f69
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
+import kafka.utils.{NotNothing, TestUtils}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData,
JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData,
OffsetFetchResponseData, SyncGroupRequestData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse,
JoinGroupRequest, JoinGroupResponse, OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse, SyncGroupRequest, SyncGroupResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+
+import java.util.Comparator
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+
+class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
+ private def brokers(): Seq[KafkaBroker] = {
+ if (cluster.isKRaftTest) {
+
cluster.asInstanceOf[RaftClusterInstance].brokers.collect(Collectors.toList[KafkaBroker]).asScala.toSeq
+ } else {
+
cluster.asInstanceOf[ZkClusterInstance].servers.collect(Collectors.toList[KafkaBroker]).asScala.toSeq
+ }
+ }
+
+ protected def createOffsetsTopic(): Unit = {
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = cluster.createAdminClient(),
+ brokers = brokers()
+ )
+ }
+
+ protected def createTopic(
+ topic: String,
+ numPartitions: Int
+ ): Unit = {
+ TestUtils.createTopicWithAdmin(
+ admin = cluster.createAdminClient(),
+ brokers = brokers(),
+ topic = topic,
+ numPartitions = numPartitions
+ )
+ }
+
+ protected def isUnstableApiEnabled: Boolean = {
+
cluster.config.serverProperties.getProperty("unstable.api.versions.enable") ==
"true"
+ }
+
+ protected def isNewGroupCoordinatorEnabled: Boolean = {
+
cluster.config.serverProperties.getProperty("group.coordinator.new.enable") ==
"true"
+ }
+
+ protected def commitOffset(
+ groupId: String,
+ memberId: String,
+ memberEpoch: Int,
+ topic: String,
+ partition: Int,
+ offset: Long,
+ expectedError: Errors,
+ version: Short
+ ): Unit = {
+ val request = new OffsetCommitRequest.Builder(
+ new OffsetCommitRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setGenerationIdOrMemberEpoch(memberEpoch)
+ .setTopics(List(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName(topic)
+ .setPartitions(List(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(partition)
+ .setCommittedOffset(offset)
+ ).asJava)
+ ).asJava),
+ isUnstableApiEnabled
+ ).build(version)
+
+ val expectedResponse = new OffsetCommitResponseData()
+ .setTopics(List(
+ new OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setName(topic)
+ .setPartitions(List(
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(expectedError.code)
+ ).asJava)
+ ).asJava)
+
+ val response = connectAndReceive[OffsetCommitResponse](request)
+ assertEquals(expectedResponse, response.data)
+ }
+
+ protected def fetchOffsets(
+ groupId: String,
+ memberId: String,
+ memberEpoch: Int,
+ partitions: List[TopicPartition],
+ requireStable: Boolean,
+ version: Short
+ ): OffsetFetchResponseData.OffsetFetchResponseGroup = {
+ val request = new OffsetFetchRequest.Builder(
+ groupId,
+ memberId,
+ memberEpoch,
+ requireStable,
+ if (partitions == null) null else partitions.asJava,
+ false
+ ).build(version)
+
+ val response = connectAndReceive[OffsetFetchResponse](request)
+
+ // Normalize the response based on the version to present the
+ // same format to the caller.
+ val groupResponse = if (version >= 8) {
+ assertEquals(1, response.data.groups.size)
+ assertEquals(groupId, response.data.groups.get(0).groupId)
+ response.data.groups.asScala.head
+ } else {
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(groupId)
+ .setErrorCode(response.data.errorCode)
+ .setTopics(response.data.topics.asScala.map { topic =>
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName(topic.name)
+ .setPartitions(topic.partitions.asScala.map { partition =>
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(partition.partitionIndex)
+ .setErrorCode(partition.errorCode)
+ .setCommittedOffset(partition.committedOffset)
+ .setCommittedLeaderEpoch(partition.committedLeaderEpoch)
+ .setMetadata(partition.metadata)
+ }.asJava)
+ }.asJava)
+ }
+
+ // Sort topics and partitions within the response as their order is not
guaranteed.
+ sortTopicPartitions(groupResponse)
+
+ groupResponse
+ }
+
+ protected def fetchOffsets(
+ groups: Map[String, List[TopicPartition]],
+ requireStable: Boolean,
+ version: Short
+ ): List[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
+ if (version < 8) {
+ fail(s"OffsetFetch API version $version cannot fetch multiple groups.")
+ }
+
+ val request = new OffsetFetchRequest.Builder(
+ groups.map { case (k, v) => (k, v.asJava) }.asJava,
+ requireStable,
+ false
+ ).build(version)
+
+ val response = connectAndReceive[OffsetFetchResponse](request)
+
+ // Sort topics and partitions within the response as their order is not
guaranteed.
+ response.data.groups.asScala.foreach(sortTopicPartitions)
+
+ response.data.groups.asScala.toList
+ }
+
+ private def sortTopicPartitions(
+ group: OffsetFetchResponseData.OffsetFetchResponseGroup
+ ): Unit = {
+ group.topics.sort((t1, t2) => t1.name.compareTo(t2.name))
+ group.topics.asScala.foreach { topic =>
+
topic.partitions.sort(Comparator.comparingInt[OffsetFetchResponseData.OffsetFetchResponsePartitions](_.partitionIndex))
+ }
+ }
+
+ protected def joinConsumerGroupWithOldProtocol(groupId: String): (String,
Int) = {
+ val joinGroupRequestData = new JoinGroupRequestData()
+ .setGroupId(groupId)
+ .setRebalanceTimeoutMs(5 * 50 * 1000)
+ .setSessionTimeoutMs(600000)
+ .setProtocolType("consumer")
+ .setProtocols(new
JoinGroupRequestData.JoinGroupRequestProtocolCollection(
+ List(
+ new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("consumer-range")
+ .setMetadata(Array.empty)
+ ).asJava.iterator
+ ))
+
+ // Join the group as a dynamic member.
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var joinGroupRequest = new
JoinGroupRequest.Builder(joinGroupRequestData).build()
+ var joinGroupResponse: JoinGroupResponse = null
+ TestUtils.waitUntilTrue(() => {
+ joinGroupResponse =
connectAndReceive[JoinGroupResponse](joinGroupRequest)
+ joinGroupResponse.data.errorCode == Errors.MEMBER_ID_REQUIRED.code
+ }, msg = s"Could not join the group successfully. Last response
$joinGroupResponse.")
+
+ // Rejoin the group with the member id.
+ joinGroupRequestData.setMemberId(joinGroupResponse.data.memberId)
+ joinGroupRequest = new
JoinGroupRequest.Builder(joinGroupRequestData).build()
+ joinGroupResponse = connectAndReceive[JoinGroupResponse](joinGroupRequest)
+ assertEquals(Errors.NONE.code, joinGroupResponse.data.errorCode)
+
+ val syncGroupRequestData = new SyncGroupRequestData()
+ .setGroupId(groupId)
+ .setMemberId(joinGroupResponse.data.memberId)
+ .setGenerationId(joinGroupResponse.data.generationId)
+ .setProtocolType("consumer")
+ .setProtocolName("consumer-range")
+ .setAssignments(List.empty.asJava)
+
+ // Send the sync group request to complete the rebalance.
+ val syncGroupRequest = new
SyncGroupRequest.Builder(syncGroupRequestData).build()
+ val syncGroupResponse =
connectAndReceive[SyncGroupResponse](syncGroupRequest)
+ assertEquals(Errors.NONE.code, syncGroupResponse.data.errorCode)
+
+ (joinGroupResponse.data.memberId, joinGroupResponse.data.generationId)
+ }
+
+ protected def joinConsumerGroupWithNewProtocol(groupId: String): (String,
Int) = {
+ // Heartbeat request to join the group.
+ val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava),
+ true
+ ).build()
+
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
+
+ (consumerGroupHeartbeatResponse.data.memberId,
consumerGroupHeartbeatResponse.data.memberEpoch)
+ }
+
+ protected def joinConsumerGroup(groupId: String, useNewProtocol: Boolean):
(String, Int) = {
+ if (useNewProtocol) {
+ // Note that we heartbeat only once to join the group and assume
+ // that the test will complete within the session timeout.
+ joinConsumerGroupWithNewProtocol(groupId)
+ } else {
+ // Note that we don't heartbeat and assume that the test will
+ // complete within the session timeout.
+ joinConsumerGroupWithOldProtocol(groupId)
+ }
+ }
+
+ protected def connectAndReceive[T <: AbstractResponse](
+ request: AbstractRequest
+ )(implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
+ IntegrationTestUtils.connectAndReceive[T](
+ request,
+ cluster.anyBrokerSocketServer(),
+ cluster.clientListener()
+ )
+ }
+}
diff --git
a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
new file mode 100644
index 00000000000..e428ca6617d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.junit.jupiter.api.Assertions.fail
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetCommitRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
+
+ @ClusterTest(serverProperties = Array(
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms",
value = "600000"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "600000"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1")
+ ))
+ def testOffsetCommitWithNewConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ testOffsetCommit(true)
+ }
+
+ @ClusterTest(serverProperties = Array(
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1")
+ ))
+ def testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ testOffsetCommit(false)
+ }
+
+ @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"false"),
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"false"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1")
+ ))
+ def testOffsetCommitWithOldConsumerGroupProtocolAndOldGroupCoordinator():
Unit = {
+ testOffsetCommit(false)
+ }
+
+ private def testOffsetCommit(useNewProtocol: Boolean): Unit = {
+ if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
+ fail("Cannot use the new protocol with the old group coordinator.")
+ }
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
+
+ // Create the topic.
+ createTopic(
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Join the consumer group. Note that we don't heartbeat here so we must
use
+ // a session long enough for the duration of the test.
+ val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol)
+
+ // Start from version 1 because version 0 goes to ZK.
+ for (version <- 1 to
ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
+ // Commit offset.
+ commitOffset(
+ groupId = "grp",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ topic = "foo",
+ partition = 0,
+ offset = 100L,
+ expectedError = Errors.NONE,
+ version = version.toShort
+ )
+
+ // Commit offset with unknown group should fail.
+ commitOffset(
+ groupId = "unknown",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ topic = "foo",
+ partition = 0,
+ offset = 100L,
+ expectedError =
+ if (isNewGroupCoordinatorEnabled && version >= 9)
Errors.GROUP_ID_NOT_FOUND
+ else Errors.ILLEGAL_GENERATION,
+ version = version.toShort
+ )
+
+ // Commit offset with empty group id should fail.
+ commitOffset(
+ groupId = "",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ topic = "foo",
+ partition = 0,
+ offset = 100L,
+ expectedError =
+ if (isNewGroupCoordinatorEnabled && version >= 9)
Errors.GROUP_ID_NOT_FOUND
+ else Errors.ILLEGAL_GENERATION,
+ version = version.toShort
+ )
+
+ // Commit offset with unknown member id should fail.
+ commitOffset(
+ groupId = "grp",
+ memberId = "",
+ memberEpoch = memberEpoch,
+ topic = "foo",
+ partition = 0,
+ offset = 100L,
+ expectedError = Errors.UNKNOWN_MEMBER_ID,
+ version = version.toShort
+ )
+
+ // Commit offset with stale member epoch should fail.
+ commitOffset(
+ groupId = "grp",
+ memberId = memberId,
+ memberEpoch = memberEpoch + 1,
+ topic = "foo",
+ partition = 0,
+ offset = 100L,
+ expectedError =
+ if (useNewProtocol && version >= 9) Errors.STALE_MEMBER_EPOCH
+ else if (useNewProtocol) Errors.UNSUPPORTED_VERSION
+ else Errors.ILLEGAL_GENERATION,
+ version = version.toShort
+ )
+
+ // Commit offset to a group without member id/epoch should succeed.
+ // This simulate a call from the admin client.
+ commitOffset(
+ groupId = "other-grp",
+ memberId = "",
+ memberEpoch = -1,
+ topic = "foo",
+ partition = 0,
+ offset = 100L,
+ expectedError = Errors.NONE,
+ version = version.toShort
+ )
+ }
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index cfe550e2671..200cdd6511a 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -14,233 +14,492 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package kafka.server
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.OffsetFetchResponseData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
-import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest,
OffsetFetchResponse}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
-import java.util
-import java.util.Collections.singletonList
import scala.jdk.CollectionConverters._
-import java.util.{Collections, Optional, Properties}
-
-class OffsetFetchRequestTest extends BaseRequestTest {
-
- override def brokerCount: Int = 1
-
- val brokerId: Integer = 0
- val offset = 15L
- val leaderEpoch: Optional[Integer] = Optional.of(3)
- val metadata = "metadata"
- val topic = "topic"
- val groupId = "groupId"
- val groups: Seq[String] = (1 to 5).map(i => s"group$i")
- val topics: Seq[String] = (1 to 3).map(i => s"topic$i")
- val topic1List = singletonList(new TopicPartition(topics(0), 0))
- val topic1And2List = util.Arrays.asList(
- new TopicPartition(topics(0), 0),
- new TopicPartition(topics(1), 0),
- new TopicPartition(topics(1), 1))
- val allTopicsList = util.Arrays.asList(
- new TopicPartition(topics(0), 0),
- new TopicPartition(topics(1), 0),
- new TopicPartition(topics(1), 1),
- new TopicPartition(topics(2), 0),
- new TopicPartition(topics(2), 1),
- new TopicPartition(topics(2), 2))
- val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
- new util.HashMap[String, util.List[TopicPartition]]()
- groupToPartitionMap.put(groups(0), topic1List)
- groupToPartitionMap.put(groups(1), topic1And2List)
- groupToPartitionMap.put(groups(2), allTopicsList)
- groupToPartitionMap.put(groups(3), null)
- groupToPartitionMap.put(groups(4), null)
-
- override def brokerPropertyOverrides(properties: Properties): Unit = {
- properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
- properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
- properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
- properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
- properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
- properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
- properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetFetchRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
+
+ @ClusterTest(serverProperties = Array(
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms",
value = "600000"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "600000"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1")
+ ))
+ def
testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true)
}
- @BeforeEach
- override def setUp(testInfo: TestInfo): Unit = {
- doSetup(testInfo, createOffsetsTopic = false)
+ @ClusterTest(serverProperties = Array(
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms",
value = "600000"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "600000"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1")
+ ))
+ def
testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false)
+ }
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"false"),
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"false"),
+ new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms",
value = "600000"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "600000"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1")
+ ))
+ def
testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator():
Unit = {
+ testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = true)
}
- @Test
- def testOffsetFetchRequestSingleGroup(): Unit = {
- createTopic(topic)
-
- val tpList = singletonList(new TopicPartition(topic, 0))
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
- commitOffsets(tpList)
-
- // testing from version 1 onward since version 0 read offsets from ZK
- for (version <- 1 to ApiKeys.OFFSET_FETCH.latestVersion()) {
- if (version < 8) {
- val request =
- if (version < 7) {
- new OffsetFetchRequest.Builder(
- groupId, false, tpList, false)
- .build(version.asInstanceOf[Short])
- } else {
- new OffsetFetchRequest.Builder(
- groupId, false, tpList, true)
- .build(version.asInstanceOf[Short])
- }
- val response = connectAndReceive[OffsetFetchResponse](request)
- val topicData = response.data().topics().get(0)
- val partitionData = topicData.partitions().get(0)
- if (version < 3) {
- assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME,
response.throttleTimeMs())
- }
- verifySingleGroupResponse(version.asInstanceOf[Short],
- response.error().code(), partitionData.errorCode(), topicData.name(),
- partitionData.partitionIndex(), partitionData.committedOffset(),
- partitionData.committedLeaderEpoch(), partitionData.metadata())
- } else {
- val request = new OffsetFetchRequest.Builder(
- Map(groupId -> tpList).asJava, false, false)
- .build(version.asInstanceOf[Short])
- val response = connectAndReceive[OffsetFetchResponse](request)
- val groupData = response.data().groups().get(0)
- val topicData = groupData.topics().get(0)
- val partitionData = topicData.partitions().get(0)
- verifySingleGroupResponse(version.asInstanceOf[Short],
- groupData.errorCode(), partitionData.errorCode(), topicData.name(),
- partitionData.partitionIndex(), partitionData.committedOffset(),
- partitionData.committedLeaderEpoch(), partitionData.metadata())
- }
- }
+ @ClusterTest(serverProperties = Array(
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms",
value = "600000"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "600000"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1")
+ ))
+ def
testSingleGroupAllOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ testSingleGroupAllOffsetFetch(useNewProtocol = true, requireStable = true)
}
- @Test
- def testOffsetFetchRequestAllOffsetsSingleGroup(): Unit = {
- createTopic(topic)
-
- val tpList = singletonList(new TopicPartition(topic, 0))
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
- commitOffsets(tpList)
-
- // Testing from version 2 onward since version 0 and do not support
- // fetching all offsets.
- for (version <- 2 to ApiKeys.OFFSET_FETCH.latestVersion()) {
- if (version < 8) {
- val request = new OffsetFetchRequest.Builder(
- groupId,
- false,
- null,
- version >= 7
- ).build(version.toShort)
-
- val response = connectAndReceive[OffsetFetchResponse](request)
- assertEquals(Errors.NONE, response.error())
- val topicData = response.data.topics().get(0)
- val partitionData = topicData.partitions().get(0)
- if (version < 3) {
- assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME,
response.throttleTimeMs())
- }
- verifySingleGroupResponse(version.asInstanceOf[Short],
- response.error().code(), partitionData.errorCode(), topicData.name(),
- partitionData.partitionIndex(), partitionData.committedOffset(),
- partitionData.committedLeaderEpoch(), partitionData.metadata())
- } else {
- val request = new OffsetFetchRequest.Builder(
- Collections.singletonMap(groupId, null),
- false,
- false
- ).build(version.toShort)
-
- val response = connectAndReceive[OffsetFetchResponse](request)
- assertEquals(Errors.NONE, response.groupLevelError(groupId))
- val groupData = response.data().groups().get(0)
- val topicData = groupData.topics().get(0)
- val partitionData = topicData.partitions().get(0)
- verifySingleGroupResponse(version.asInstanceOf[Short],
- groupData.errorCode(), partitionData.errorCode(), topicData.name(),
- partitionData.partitionIndex(), partitionData.committedOffset(),
- partitionData.committedLeaderEpoch(), partitionData.metadata())
+ @ClusterTest(serverProperties = Array(
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms",
value = "600000"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "600000"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1")
+ ))
+ def
testSingleGroupAllOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable =
false)
+ }
+
+ @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"false"),
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"false"),
+ new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms",
value = "600000"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "600000"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1")
+ ))
+ def
testSingleGroupAllOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator():
Unit = {
+ testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable = true)
+ }
+
+ @ClusterTest(serverProperties = Array(
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms",
value = "600000"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "600000"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1")
+ ))
+ def
testMultiGroupsOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ testMultipleGroupsOffsetFetch(useNewProtocol = true, requireStable = true)
+ }
+
+ @ClusterTest(serverProperties = Array(
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms",
value = "600000"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "600000"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1")
+ ))
+ def
testMultiGroupsOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable =
false)
+ }
+
+ @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"false"),
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"false"),
+ new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms",
value = "600000"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "600000"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1")
+ ))
+ def
testMultiGroupsOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator():
Unit = {
+ testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = true)
+ }
+
+ private def testSingleGroupOffsetFetch(useNewProtocol: Boolean,
requireStable: Boolean): Unit = {
+ if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
+ fail("Cannot use the new protocol with the old group coordinator.")
+ }
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
+
+ // Create the topic.
+ createTopic(
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Join the consumer group. Note that we don't heartbeat here so we must
use
+ // a session long enough for the duration of the test.
+ val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol)
+
+ // Commit offsets.
+ for (partitionId <- 0 to 2) {
+ commitOffset(
+ groupId = "grp",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ topic = "foo",
+ partition = partitionId,
+ offset = 100L + partitionId,
+ expectedError = Errors.NONE,
+ version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+ )
+ }
+
+ // Start from version 1 because version 0 goes to ZK.
+ for (version <- 1 to
ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) {
+ // Fetch with partitions.
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(101L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(5)
+ .setCommittedOffset(-1L)
+ ).asJava)
+ ).asJava),
+ fetchOffsets(
+ groupId = "grp",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ partitions = List(
+ new TopicPartition("foo", 0),
+ new TopicPartition("foo", 1),
+ new TopicPartition("foo", 5) // This one does not exist.
+ ),
+ requireStable = requireStable,
+ version = version.toShort
+ )
+ )
+
+ // Fetch with unknown group id.
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("unknown")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(-1L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(-1L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(5)
+ .setCommittedOffset(-1L)
+ ).asJava)
+ ).asJava),
+ fetchOffsets(
+ groupId = "unknown",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ partitions = List(
+ new TopicPartition("foo", 0),
+ new TopicPartition("foo", 1),
+ new TopicPartition("foo", 5) // This one does not exist.
+ ),
+ requireStable = requireStable,
+ version = version.toShort
+ )
+ )
+
+ if (useNewProtocol && version >= 9) {
+ // Fetch with unknown member id.
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp")
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code),
+ fetchOffsets(
+ groupId = "grp",
+ memberId = "",
+ memberEpoch = memberEpoch,
+ partitions = List.empty,
+ requireStable = requireStable,
+ version = version.toShort
+ )
+ )
+
+ // Fetch with stale member epoch.
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp")
+ .setErrorCode(Errors.STALE_MEMBER_EPOCH.code),
+ fetchOffsets(
+ groupId = "grp",
+ memberId = memberId,
+ memberEpoch = memberEpoch + 1,
+ partitions = List.empty,
+ requireStable = requireStable,
+ version = version.toShort
+ )
+ )
}
}
}
- @Test
- def testOffsetFetchRequestWithMultipleGroups(): Unit = {
- createTopic(topics(0))
- createTopic(topics(1), numPartitions = 2)
- createTopic(topics(2), numPartitions = 3)
-
- // create 5 consumers to commit offsets so we can fetch them later
- val partitionMap = groupToPartitionMap.asScala.map(e => (e._1,
Option(e._2).getOrElse(allTopicsList)))
- groups.foreach { groupId =>
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
- commitOffsets(partitionMap(groupId))
+ private def testSingleGroupAllOffsetFetch(useNewProtocol: Boolean,
requireStable: Boolean): Unit = {
+ if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
+ fail("Cannot use the new protocol with the old group coordinator.")
}
- for (version <- 8 to ApiKeys.OFFSET_FETCH.latestVersion()) {
- val request = new OffsetFetchRequest.Builder(groupToPartitionMap,
false, false)
- .build(version.asInstanceOf[Short])
- val response = connectAndReceive[OffsetFetchResponse](request)
- response.data.groups.asScala.map(_.groupId).foreach( groupId =>
- verifyResponse(response.groupLevelError(groupId),
response.partitionDataMap(groupId), partitionMap(groupId))
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
+
+ // Create the topic.
+ createTopic(
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Join the consumer group. Note that we don't heartbeat here so we must
use
+ // a session long enough for the duration of the test.
+ val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol)
+
+ // Commit offsets.
+ for (partitionId <- 0 to 2) {
+ commitOffset(
+ groupId = "grp",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ topic = "foo",
+ partition = partitionId,
+ offset = 100L + partitionId,
+ expectedError = Errors.NONE,
+ version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
)
}
- }
- private def verifySingleGroupResponse(version: Short,
- responseError: Short,
- partitionError: Short,
- topicName: String,
- partitionIndex: Integer,
- committedOffset: Long,
- committedLeaderEpoch: Integer,
- partitionMetadata: String): Unit = {
- assertEquals(Errors.NONE.code(), responseError)
- assertEquals(topic, topicName)
- assertEquals(0, partitionIndex)
- assertEquals(offset, committedOffset)
- if (version >= 5) {
- assertEquals(leaderEpoch.get(), committedLeaderEpoch)
+ // Start from version 2 because fetching all partitions is not
+ // supported before.
+ for (version <- 2 to
ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) {
+ // Fetch all partitions.
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(101L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(2)
+ .setCommittedOffset(102L)
+ ).asJava)
+ ).asJava),
+ fetchOffsets(
+ groupId = "grp",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ partitions = null,
+ requireStable = requireStable,
+ version = version.toShort
+ )
+ )
+
+ // Fetch with a unknown group id.
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("unknown"),
+ fetchOffsets(
+ groupId = "unknown",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ partitions = null,
+ requireStable = requireStable,
+ version = version.toShort
+ )
+ )
+
+ if (useNewProtocol && version >= 9) {
+ // Fetch with an unknown member id.
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp")
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code),
+ fetchOffsets(
+ groupId = "grp",
+ memberId = "",
+ memberEpoch = memberEpoch,
+ partitions = null,
+ requireStable = requireStable,
+ version = version.toShort
+ )
+ )
+
+ // Fetch with a stable member epoch.
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp")
+ .setErrorCode(Errors.STALE_MEMBER_EPOCH.code),
+ fetchOffsets(
+ groupId = "grp",
+ memberId = memberId,
+ memberEpoch = memberEpoch + 1,
+ partitions = null,
+ requireStable = requireStable,
+ version = version.toShort
+ )
+ )
+ }
}
- assertEquals(metadata, partitionMetadata)
- assertEquals(Errors.NONE.code(), partitionError)
}
- private def verifyPartitionData(partitionData:
OffsetFetchResponse.PartitionData): Unit = {
- assertTrue(!partitionData.hasError)
- assertEquals(offset, partitionData.offset)
- assertEquals(metadata, partitionData.metadata)
- assertEquals(leaderEpoch.get(), partitionData.leaderEpoch.get())
- }
+ private def testMultipleGroupsOffsetFetch(useNewProtocol: Boolean,
requireStable: Boolean): Unit = {
+ if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
+ fail("Cannot use the new protocol with the old group coordinator.")
+ }
- private def verifyResponse(groupLevelResponse: Errors,
- partitionData: util.Map[TopicPartition,
PartitionData],
- topicList: util.List[TopicPartition]): Unit = {
- assertEquals(Errors.NONE, groupLevelResponse)
- assertTrue(partitionData.size() == topicList.size())
- topicList.forEach(t => verifyPartitionData(partitionData.get(t)))
- }
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
+
+ // Create the topic.
+ createTopic(
+ topic = "foo",
+ numPartitions = 3
+ )
- private def commitOffsets(tpList: util.List[TopicPartition]): Unit = {
- val consumer = createConsumer()
- consumer.assign(tpList)
- val offsets = tpList.asScala.map{
- tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
- }.toMap.asJava
- consumer.commitSync(offsets)
- consumer.close()
+ // Create groups and commit offsets.
+ List("grp-0", "grp-1", "grp-2").foreach { groupId =>
+ // Join the consumer group. Note that we don't heartbeat here so we must
use
+ // a session long enough for the duration of the test.
+ val (memberId, memberEpoch) = joinConsumerGroup(groupId, useNewProtocol)
+
+ for (partitionId <- 0 to 2) {
+ commitOffset(
+ groupId = groupId,
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ topic = "foo",
+ partition = partitionId,
+ offset = 100L + partitionId,
+ expectedError = Errors.NONE,
+ version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+ )
+ }
+ }
+
+ // Start from version 8 because older versions do not support
+ // fetch offsets for multiple groups.
+ for (version <- 8 to
ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) {
+ assertEquals(
+ List(
+ // Fetch foo-0, foo-1 and foo-5.
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp-0")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(101L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(5)
+ .setCommittedOffset(-1L)
+ ).asJava)
+ ).asJava),
+ // Fetch all partitions.
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp-1")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(101L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(2)
+ .setCommittedOffset(102L)
+ ).asJava)
+ ).asJava),
+ // Fetch no partitions.
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp-2")
+ .setTopics(List.empty.asJava),
+ // Fetch unknown group.
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp-3")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(-1L)
+ ).asJava)
+ ).asJava),
+ ),
+ fetchOffsets(
+ groups = Map(
+ "grp-0" -> List(
+ new TopicPartition("foo", 0),
+ new TopicPartition("foo", 1),
+ new TopicPartition("foo", 5) // This one does not exist.
+ ),
+ "grp-1" -> null,
+ "grp-2" -> List.empty,
+ "grp-3" -> List(
+ new TopicPartition("foo", 0)
+ )
+ ),
+ requireStable = requireStable,
+ version = version.toShort
+ )
+ )
+ }
}
}