This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4e075e90c3a MINOR: Deduplicate common test helpers in
group-coordinator tests (#21047)
4e075e90c3a is described below
commit 4e075e90c3a19f78e4a26a83c5229518971476e1
Author: Aneesh Garg <[email protected]>
AuthorDate: Tue Dec 2 20:48:57 2025 +0530
MINOR: Deduplicate common test helpers in group-coordinator tests (#21047)
This PR adds a helper method to create topics using the raw admin client
in `GroupCoordinatorBaseRequestTest`. It also updates related tests to
use the existing `createOffsetsTopic` helper instead of manually
duplicating topic-creation logic in multiple places. No functional
changes to test behavior.
Reviewers: David Jacot <[email protected]>
---
.../server/ConsumerGroupDescribeRequestTest.scala | 256 ++++---
.../server/ConsumerGroupHeartbeatRequestTest.scala | 740 +++++++++------------
2 files changed, 451 insertions(+), 545 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index 0f55feccb46..17057345c5b 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -79,151 +79,145 @@ class ConsumerGroupDescribeRequestTest(cluster:
ClusterInstance) extends GroupCo
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
- val admin = cluster.admin()
- try {
- val topicId = TestUtils.createTopicWithAdminRaw(
- admin = admin,
- topic = "foo",
- numPartitions = 3
- )
+ val topicId = createTopic(
+ topic = "foo",
+ numPartitions = 3
+ )
- val timeoutMs = 5 * 60 * 1000
- val clientId = "client-id"
- val clientHost = "/127.0.0.1"
- val authorizedOperationsInt = Utils.to32BitField(
- AclEntry.supportedOperations(ResourceType.GROUP).asScala
- .map(_.code.asInstanceOf[JByte]).asJava)
+ val timeoutMs = 5 * 60 * 1000
+ val clientId = "client-id"
+ val clientHost = "/127.0.0.1"
+ val authorizedOperationsInt = Utils.to32BitField(
+ AclEntry.supportedOperations(ResourceType.GROUP).asScala
+ .map(_.code.asInstanceOf[JByte]).asJava)
- // Add first group with one member.
- var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
- TestUtils.waitUntilTrue(() => {
- grp1Member1Response = consumerGroupHeartbeat(
- groupId = "grp-1",
- memberId = Uuid.randomUuid().toString,
- rebalanceTimeoutMs = timeoutMs,
- subscribedTopicNames = List("bar"),
- topicPartitions = List.empty
- )
- grp1Member1Response.errorCode == Errors.NONE.code
- }, msg = s"Could not join the group successfully. Last response
$grp1Member1Response.")
-
- // Add second group with two members. For the first member, we
- // wait until it receives an assignment. We use 'range` in this
- // case to validate the assignor selection logic.
- var grp2Member1Response: ConsumerGroupHeartbeatResponseData = null
- TestUtils.waitUntilTrue(() => {
- grp2Member1Response = consumerGroupHeartbeat(
- memberId = "member-1",
- groupId = "grp-2",
- serverAssignor = "range",
- rebalanceTimeoutMs = timeoutMs,
- subscribedTopicNames = List("foo"),
- topicPartitions = List.empty
- )
- grp2Member1Response.assignment != null &&
!grp2Member1Response.assignment.topicPartitions.isEmpty
- }, msg = s"Could not join the group successfully. Last response
$grp2Member1Response.")
+ // Add first group with one member.
+ var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
+ TestUtils.waitUntilTrue(() => {
+ grp1Member1Response = consumerGroupHeartbeat(
+ groupId = "grp-1",
+ memberId = Uuid.randomUuid().toString,
+ rebalanceTimeoutMs = timeoutMs,
+ subscribedTopicNames = List("bar"),
+ topicPartitions = List.empty
+ )
+ grp1Member1Response.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$grp1Member1Response.")
- val grp2Member2Response = consumerGroupHeartbeat(
- memberId = "member-2",
+ // Add second group with two members. For the first member, we
+ // wait until it receives an assignment. We use 'range` in this
+ // case to validate the assignor selection logic.
+ var grp2Member1Response: ConsumerGroupHeartbeatResponseData = null
+ TestUtils.waitUntilTrue(() => {
+ grp2Member1Response = consumerGroupHeartbeat(
+ memberId = "member-1",
groupId = "grp-2",
serverAssignor = "range",
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty
)
+ grp2Member1Response.assignment != null &&
!grp2Member1Response.assignment.topicPartitions.isEmpty
+ }, msg = s"Could not join the group successfully. Last response
$grp2Member1Response.")
- for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to
ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
- val expected = List(
- new DescribedGroup()
- .setGroupId("grp-1")
- .setGroupState(ConsumerGroupState.STABLE.toString)
- .setGroupEpoch(1)
- .setAssignmentEpoch(1)
- .setAssignorName("uniform")
- .setAuthorizedOperations(authorizedOperationsInt)
- .setMembers(List(
- new ConsumerGroupDescribeResponseData.Member()
- .setMemberId(grp1Member1Response.memberId)
- .setMemberEpoch(grp1Member1Response.memberEpoch)
- .setClientId(clientId)
- .setClientHost(clientHost)
- .setSubscribedTopicRegex("")
- .setSubscribedTopicNames(List("bar").asJava)
- .setMemberType(if (version == 0) -1.toByte else 1.toByte)
- ).asJava),
- new DescribedGroup()
- .setGroupId("grp-2")
- .setGroupState(ConsumerGroupState.RECONCILING.toString)
- .setGroupEpoch(grp2Member2Response.memberEpoch)
- .setAssignmentEpoch(grp2Member2Response.memberEpoch)
- .setAssignorName("range")
- .setAuthorizedOperations(authorizedOperationsInt)
- .setMembers(List(
- new ConsumerGroupDescribeResponseData.Member()
- .setMemberId(grp2Member2Response.memberId)
- .setMemberEpoch(grp2Member2Response.memberEpoch)
- .setClientId(clientId)
- .setClientHost(clientHost)
- .setSubscribedTopicRegex("")
- .setSubscribedTopicNames(List("foo").asJava)
- .setAssignment(new Assignment())
- .setTargetAssignment(new Assignment()
- .setTopicPartitions(List(
- new TopicPartitions()
- .setTopicId(topicId)
- .setTopicName("foo")
- .setPartitions(List[Integer](2).asJava)
- ).asJava))
- .setMemberType(if (version == 0) -1.toByte else 1.toByte),
- new ConsumerGroupDescribeResponseData.Member()
- .setMemberId(grp2Member1Response.memberId)
- .setMemberEpoch(grp2Member1Response.memberEpoch)
- .setClientId(clientId)
- .setClientHost(clientHost)
- .setSubscribedTopicRegex("")
- .setSubscribedTopicNames(List("foo").asJava)
- .setAssignment(new Assignment()
- .setTopicPartitions(List(
- new TopicPartitions()
- .setTopicId(topicId)
- .setTopicName("foo")
- .setPartitions(List[Integer](0, 1, 2).asJava)
- ).asJava))
- .setTargetAssignment(new Assignment()
- .setTopicPartitions(List(
- new TopicPartitions()
- .setTopicId(topicId)
- .setTopicName("foo")
- .setPartitions(List[Integer](0, 1).asJava)
- ).asJava))
- .setMemberType(if (version == 0) -1.toByte else 1.toByte),
- ).asJava),
- )
+ val grp2Member2Response = consumerGroupHeartbeat(
+ memberId = "member-2",
+ groupId = "grp-2",
+ serverAssignor = "range",
+ rebalanceTimeoutMs = timeoutMs,
+ subscribedTopicNames = List("foo"),
+ topicPartitions = List.empty
+ )
- val actual = consumerGroupDescribe(
- groupIds = List("grp-1", "grp-2"),
- includeAuthorizedOperations = true,
- version = version.toShort,
- )
+ for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to
ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
+ val expected = List(
+ new DescribedGroup()
+ .setGroupId("grp-1")
+ .setGroupState(ConsumerGroupState.STABLE.toString)
+ .setGroupEpoch(1)
+ .setAssignmentEpoch(1)
+ .setAssignorName("uniform")
+ .setAuthorizedOperations(authorizedOperationsInt)
+ .setMembers(List(
+ new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId(grp1Member1Response.memberId)
+ .setMemberEpoch(grp1Member1Response.memberEpoch)
+ .setClientId(clientId)
+ .setClientHost(clientHost)
+ .setSubscribedTopicRegex("")
+ .setSubscribedTopicNames(List("bar").asJava)
+ .setMemberType(if (version == 0) -1.toByte else 1.toByte)
+ ).asJava),
+ new DescribedGroup()
+ .setGroupId("grp-2")
+ .setGroupState(ConsumerGroupState.RECONCILING.toString)
+ .setGroupEpoch(grp2Member2Response.memberEpoch)
+ .setAssignmentEpoch(grp2Member2Response.memberEpoch)
+ .setAssignorName("range")
+ .setAuthorizedOperations(authorizedOperationsInt)
+ .setMembers(List(
+ new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId(grp2Member2Response.memberId)
+ .setMemberEpoch(grp2Member2Response.memberEpoch)
+ .setClientId(clientId)
+ .setClientHost(clientHost)
+ .setSubscribedTopicRegex("")
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setAssignment(new Assignment())
+ .setTargetAssignment(new Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions()
+ .setTopicId(topicId)
+ .setTopicName("foo")
+ .setPartitions(List[Integer](2).asJava)
+ ).asJava))
+ .setMemberType(if (version == 0) -1.toByte else 1.toByte),
+ new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId(grp2Member1Response.memberId)
+ .setMemberEpoch(grp2Member1Response.memberEpoch)
+ .setClientId(clientId)
+ .setClientHost(clientHost)
+ .setSubscribedTopicRegex("")
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setAssignment(new Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions()
+ .setTopicId(topicId)
+ .setTopicName("foo")
+ .setPartitions(List[Integer](0, 1, 2).asJava)
+ ).asJava))
+ .setTargetAssignment(new Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions()
+ .setTopicId(topicId)
+ .setTopicName("foo")
+ .setPartitions(List[Integer](0, 1).asJava)
+ ).asJava))
+ .setMemberType(if (version == 0) -1.toByte else 1.toByte),
+ ).asJava),
+ )
- assertEquals(expected, actual)
+ val actual = consumerGroupDescribe(
+ groupIds = List("grp-1", "grp-2"),
+ includeAuthorizedOperations = true,
+ version = version.toShort,
+ )
- val unknownGroupResponse = consumerGroupDescribe(
- groupIds = List("grp-unknown"),
- includeAuthorizedOperations = true,
- version = version.toShort,
- )
- assertEquals(Errors.GROUP_ID_NOT_FOUND.code,
unknownGroupResponse.head.errorCode())
+ assertEquals(expected, actual)
- val emptyGroupResponse = consumerGroupDescribe(
- groupIds = List(""),
- includeAuthorizedOperations = true,
- version = version.toShort,
- )
- assertEquals(Errors.INVALID_GROUP_ID.code,
emptyGroupResponse.head.errorCode())
- }
- } finally {
- admin.close()
+ val unknownGroupResponse = consumerGroupDescribe(
+ groupIds = List("grp-unknown"),
+ includeAuthorizedOperations = true,
+ version = version.toShort,
+ )
+ assertEquals(Errors.GROUP_ID_NOT_FOUND.code,
unknownGroupResponse.head.errorCode())
+
+ val emptyGroupResponse = consumerGroupDescribe(
+ groupIds = List(""),
+ includeAuthorizedOperations = true,
+ version = version.toShort,
+ )
+ assertEquals(Errors.INVALID_GROUP_ID.code,
emptyGroupResponse.head.errorCode())
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 506d0007924..e5b4d7493c7 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -74,90 +74,79 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
@ClusterTest
def
testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit
= {
- val admin = cluster.admin()
-
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
- try {
- TestUtils.createOffsetsTopicWithAdmin(
- admin = admin,
- brokers = cluster.brokers.values().asScala.toSeq,
- controllers = cluster.controllers().values().asScala.toSeq
- )
+ createOffsetsTopic()
- // Heartbeat request to join the group. Note that the member subscribes
- // to an nonexistent topic.
- var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setMemberId(Uuid.randomUuid.toString)
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(5 * 60 * 1000)
- .setSubscribedTopicNames(List("foo").asJava)
- .setTopicPartitions(List.empty.asJava)
- ).build()
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid.toString)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
- // Send the request until receiving a successful response. There is a
delay
- // here because the group coordinator is loaded in the background.
- var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
- TestUtils.waitUntilTrue(() => {
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
- }, msg = s"Could not join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
- // Verify the response.
- assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
- assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
- assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
- // Create the topic.
- val topicId = TestUtils.createTopicWithAdminRaw(
- admin = admin,
- topic = "foo",
- numPartitions = 3
- )
+ // Create the topic.
+ val topicId = createTopic(
+ topic = "foo",
+ numPartitions = 3
+ )
- // Prepare the next heartbeat.
- consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
- .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
- ).build()
+ // Prepare the next heartbeat.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+ ).build()
- // This is the expected assignment.
- val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
- .setTopicId(topicId)
- .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+ // This is the expected assignment.
+ val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
- // Heartbeats until the partitions are assigned.
- consumerGroupHeartbeatResponse = null
- TestUtils.waitUntilTrue(() => {
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
- consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
- }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
+ // Heartbeats until the partitions are assigned.
+ consumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
- // Verify the response.
- assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
- assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
+ // Verify the response.
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
- // Leave the group.
- consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
- .setMemberEpoch(-1)
- ).build()
+ // Leave the group.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(-1)
+ ).build()
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- // Verify the response.
- assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
- } finally {
- admin.close()
- }
+ // Verify the response.
+ assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
}
@ClusterTest
@@ -166,14 +155,9 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
- try {
- TestUtils.createOffsetsTopicWithAdmin(
- admin = admin,
- brokers = cluster.brokers.values().asScala.toSeq,
- controllers = cluster.controllers().values().asScala.toSeq
- )
+ createOffsetsTopic()
- // Heartbeat request to join the group. Note that the member subscribes
+ try {// Heartbeat request to join the group. Note that the member
subscribes
// to a nonexistent topic.
var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
@@ -199,8 +183,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
// Create the topic.
- val topicId = TestUtils.createTopicWithAdminRaw(
- admin = admin,
+ val topicId = createTopic(
topic = "foo",
numPartitions = 3
)
@@ -263,254 +246,214 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
@ClusterTest
def testConsumerGroupHeartbeatWithInvalidRegularExpression(): Unit = {
- val admin = cluster.admin()
-
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
- try {
- TestUtils.createOffsetsTopicWithAdmin(
- admin = admin,
- brokers = cluster.brokers.values().asScala.toSeq,
- controllers = cluster.controllers().values().asScala.toSeq
- )
+ createOffsetsTopic()
- // Heartbeat request to join the group. Note that the member subscribes
- // to an nonexistent topic.
- val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setMemberId(Uuid.randomUuid().toString)
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(5 * 60 * 1000)
- .setSubscribedTopicRegex("[")
- .setTopicPartitions(List.empty.asJava)
- ).build()
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid().toString)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicRegex("[")
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
- // Send the request until receiving a successful response. There is a
delay
- // here because the group coordinator is loaded in the background.
- var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
- TestUtils.waitUntilTrue(() => {
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- consumerGroupHeartbeatResponse.data.errorCode ==
Errors.INVALID_REGULAR_EXPRESSION.code
- }, msg = s"Did not receive the expected error. Last response
$consumerGroupHeartbeatResponse.")
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode ==
Errors.INVALID_REGULAR_EXPRESSION.code
+ }, msg = s"Did not receive the expected error. Last response
$consumerGroupHeartbeatResponse.")
- // Verify the response.
- assertEquals(Errors.INVALID_REGULAR_EXPRESSION.code,
consumerGroupHeartbeatResponse.data.errorCode)
- } finally {
- admin.close()
- }
+ // Verify the response.
+ assertEquals(Errors.INVALID_REGULAR_EXPRESSION.code,
consumerGroupHeartbeatResponse.data.errorCode)
}
@ClusterTest
def testEmptyConsumerGroupId(): Unit = {
- val admin = cluster.admin()
-
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
- try {
- TestUtils.createOffsetsTopicWithAdmin(
- admin = admin,
- brokers = cluster.brokers.values().asScala.toSeq,
- controllers = cluster.controllers().values().asScala.toSeq
- )
+ createOffsetsTopic()
- // Heartbeat request to join the group. Note that the member subscribes
- // to an nonexistent topic.
- val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("")
- .setMemberId(Uuid.randomUuid().toString)
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(5 * 60 * 1000)
- .setSubscribedTopicNames(List("foo").asJava)
- .setTopicPartitions(List.empty.asJava),
- true
- ).build()
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("")
+ .setMemberId(Uuid.randomUuid().toString)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava),
+ true
+ ).build()
- // Send the request until receiving a successful response. There is a
delay
- // here because the group coordinator is loaded in the background.
- var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
- TestUtils.waitUntilTrue(() => {
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- consumerGroupHeartbeatResponse.data.errorCode ==
Errors.INVALID_REQUEST.code
- }, msg = s"Did not receive the expected error. Last response
$consumerGroupHeartbeatResponse.")
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode ==
Errors.INVALID_REQUEST.code
+ }, msg = s"Did not receive the expected error. Last response
$consumerGroupHeartbeatResponse.")
- // Verify the response.
- assertEquals(Errors.INVALID_REQUEST.code,
consumerGroupHeartbeatResponse.data.errorCode)
- assertEquals("GroupId can't be empty.",
consumerGroupHeartbeatResponse.data.errorMessage)
- } finally {
- admin.close()
- }
+ // Verify the response.
+ assertEquals(Errors.INVALID_REQUEST.code,
consumerGroupHeartbeatResponse.data.errorCode)
+ assertEquals("GroupId can't be empty.",
consumerGroupHeartbeatResponse.data.errorMessage)
}
@ClusterTest
def testConsumerGroupHeartbeatWithEmptySubscription(): Unit = {
- val admin = cluster.admin()
-
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
- try {
- TestUtils.createOffsetsTopicWithAdmin(
- admin = admin,
- brokers = cluster.brokers.values().asScala.toSeq,
- controllers = cluster.controllers().values().asScala.toSeq
- )
+ createOffsetsTopic()
- // Heartbeat request to join the group.
- var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setMemberId(Uuid.randomUuid().toString)
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(5 * 60 * 1000)
- .setSubscribedTopicRegex("")
- .setTopicPartitions(List.empty.asJava)
- ).build()
+ // Heartbeat request to join the group.
+ var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid().toString)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicRegex("")
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
- // Send the request until receiving a successful response. There is a
delay
- // here because the group coordinator is loaded in the background.
- var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
- TestUtils.waitUntilTrue(() => {
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
- }, msg = s"Did not receive the expected successful response. Last
response $consumerGroupHeartbeatResponse.")
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Did not receive the expected successful response. Last response
$consumerGroupHeartbeatResponse.")
- // Heartbeat request to join the group.
- consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setMemberId(Uuid.randomUuid().toString)
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(5 * 60 * 1000)
- .setSubscribedTopicNames(List.empty.asJava)
- .setTopicPartitions(List.empty.asJava)
- ).build()
+ // Heartbeat request to join the group.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid().toString)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List.empty.asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
- // Send the request until receiving a successful response. There is a
delay
- // here because the group coordinator is loaded in the background.
- consumerGroupHeartbeatResponse = null
- TestUtils.waitUntilTrue(() => {
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
- }, msg = s"Did not receive the expected successful response. Last
response $consumerGroupHeartbeatResponse.")
- } finally {
- admin.close()
- }
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ consumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Did not receive the expected successful response. Last response
$consumerGroupHeartbeatResponse.")
}
@ClusterTest
def
testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled():
Unit = {
- val admin = cluster.admin()
- try {
- val instanceId = "instanceId"
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
- // Creates the __consumer_offsets topics because it won't be created
automatically
- // in this test because it does not use FindCoordinator API.
- TestUtils.createOffsetsTopicWithAdmin(
- admin = admin,
- brokers = cluster.brokers.values().asScala.toSeq,
- controllers = cluster.controllers().values().asScala.toSeq
- )
+ val instanceId = "instanceId"
- // Heartbeat request so that a static member joins the group
- var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setMemberId(Uuid.randomUuid.toString)
- .setInstanceId(instanceId)
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(5 * 60 * 1000)
- .setSubscribedTopicNames(List("foo").asJava)
- .setTopicPartitions(List.empty.asJava)
- ).build()
+ // Heartbeat request so that a static member joins the group
+ var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid.toString)
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
- // Send the request until receiving a successful response. There is a
delay
- // here because the group coordinator is loaded in the background.
- var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
- TestUtils.waitUntilTrue(() => {
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
- }, msg = s"Static member could not join the group successfully. Last
response $consumerGroupHeartbeatResponse.")
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Static member could not join the group successfully. Last
response $consumerGroupHeartbeatResponse.")
- // Verify the response.
- assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
- assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
- assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
- // Create the topic.
- val topicId = TestUtils.createTopicWithAdminRaw(
- admin = admin,
- topic = "foo",
- numPartitions = 3
- )
+ // Create the topic.
+ val topicId = createTopic(
+ topic = "foo",
+ numPartitions = 3
+ )
- // Prepare the next heartbeat.
- consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setInstanceId(instanceId)
- .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
- .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
- ).build()
+ // Prepare the next heartbeat.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+ ).build()
- // This is the expected assignment.
- val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
- .setTopicId(topicId)
- .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+ // This is the expected assignment.
+ val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
- // Heartbeats until the partitions are assigned.
- consumerGroupHeartbeatResponse = null
- TestUtils.waitUntilTrue(() => {
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
- consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
- }, msg = s"Static member could not get partitions assigned. Last
response $consumerGroupHeartbeatResponse.")
+ // Heartbeats until the partitions are assigned.
+ consumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Static member could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
- // Verify the response.
- assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
- assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
- assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
- val oldMemberId = consumerGroupHeartbeatResponse.data.memberId
+ val oldMemberId = consumerGroupHeartbeatResponse.data.memberId
- // Leave the group temporarily
- consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setInstanceId(instanceId)
- .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
- .setMemberEpoch(-2)
- ).build()
+ // Leave the group temporarily
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(-2)
+ ).build()
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- // Verify the response.
- assertEquals(-2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ // Verify the response.
+ assertEquals(-2, consumerGroupHeartbeatResponse.data.memberEpoch)
- // Another static member replaces the above member. It gets the same
assignments back
- consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setMemberId(Uuid.randomUuid.toString)
- .setInstanceId(instanceId)
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(5 * 60 * 1000)
- .setSubscribedTopicNames(List("foo").asJava)
- .setTopicPartitions(List.empty.asJava)
- ).build()
+ // Another static member replaces the above member. It gets the same
assignments back
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid.toString)
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- // Verify the response.
- assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
- assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
- assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
- // The 2 member IDs should be different
- assertNotEquals(oldMemberId,
consumerGroupHeartbeatResponse.data.memberId)
- } finally {
- admin.close()
- }
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
+ // The 2 member IDs should be different
+ assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
}
@ClusterTest(
@@ -520,109 +463,99 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
)
)
def
testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled():
Unit = {
- val admin = cluster.admin()
- try {
- val instanceId = "instanceId"
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
- // Creates the __consumer_offsets topics because it won't be created
automatically
- // in this test because it does not use FindCoordinator API.
- TestUtils.createOffsetsTopicWithAdmin(
- admin = admin,
- brokers = cluster.brokers.values().asScala.toSeq,
- controllers = cluster.controllers().values().asScala.toSeq
- )
+ val instanceId = "instanceId"
- // Heartbeat request to join the group. Note that the member subscribes
- // to an nonexistent topic.
- var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setMemberId(Uuid.randomUuid.toString)
- .setInstanceId(instanceId)
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(5 * 60 * 1000)
- .setSubscribedTopicNames(List("foo").asJava)
- .setTopicPartitions(List.empty.asJava)
- ).build()
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid.toString)
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
- // Send the request until receiving a successful response. There is a
delay
- // here because the group coordinator is loaded in the background.
- var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
- TestUtils.waitUntilTrue(() => {
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
- }, msg = s"Could not join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
- // Verify the response.
- assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
- assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
- assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
- // Create the topic.
- val topicId = TestUtils.createTopicWithAdminRaw(
- admin = admin,
- topic = "foo",
- numPartitions = 3
- )
+ // Create the topic.
+ val topicId = createTopic(
+ topic = "foo",
+ numPartitions = 3
+ )
- // Prepare the next heartbeat.
- consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setInstanceId(instanceId)
- .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
- .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
- ).build()
+ // Prepare the next heartbeat.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+ ).build()
- // This is the expected assignment.
- val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
- .setTopicId(topicId)
- .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+ // This is the expected assignment.
+ val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
- // Heartbeats until the partitions are assigned.
- consumerGroupHeartbeatResponse = null
- TestUtils.waitUntilTrue(() => {
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
- consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
- }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
+ // Heartbeats until the partitions are assigned.
+ consumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
- // Verify the response.
- assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
- assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
+ // Verify the response.
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
- // A new static member tries to join the group with an inuse instanceid.
- consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setMemberId(Uuid.randomUuid.toString)
- .setInstanceId(instanceId)
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(5 * 60 * 1000)
- .setSubscribedTopicNames(List("foo").asJava)
- .setTopicPartitions(List.empty.asJava)
- ).build()
+ // A new static member tries to join the group with an inuse instanceid.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid.toString)
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
- // Validating that trying to join with an in-use instanceId would throw
an UnreleasedInstanceIdException.
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- assertEquals(Errors.UNRELEASED_INSTANCE_ID.code,
consumerGroupHeartbeatResponse.data.errorCode)
+ // Validating that trying to join with an in-use instanceId would throw an
UnreleasedInstanceIdException.
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ assertEquals(Errors.UNRELEASED_INSTANCE_ID.code,
consumerGroupHeartbeatResponse.data.errorCode)
- // The new static member join group will keep failing with an
UnreleasedInstanceIdException
- // until eventually it gets through because the existing member will be
kicked out
- // because of not sending a heartbeat till session timeout expiry.
- TestUtils.waitUntilTrue(() => {
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
- consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
- }, msg = s"Could not re-join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
+ // The new static member join group will keep failing with an
UnreleasedInstanceIdException
+ // until eventually it gets through because the existing member will be
kicked out
+ // because of not sending a heartbeat till session timeout expiry.
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not re-join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
- // Verify the response. The group epoch bumps upto 4 which eventually
reflects in the new member epoch.
- assertEquals(4, consumerGroupHeartbeatResponse.data.memberEpoch)
- assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
- } finally {
- admin.close()
- }
+ // Verify the response. The group epoch bumps upto 4 which eventually
reflects in the new member epoch.
+ assertEquals(4, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
}
@ClusterTest(
@@ -632,19 +565,15 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
)
def testUpdateConsumerGroupHeartbeatConfigSuccessful(): Unit = {
val admin = cluster.admin()
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
try {
val newHeartbeatIntervalMs = 10000
val instanceId = "instanceId"
val consumerGroupId = "grp"
- // Creates the __consumer_offsets topics because it won't be created
automatically
- // in this test because it does not use FindCoordinator API.
- TestUtils.createOffsetsTopicWithAdmin(
- admin = admin,
- brokers = cluster.brokers.values().asScala.toSeq,
- controllers = cluster.controllers().values().asScala.toSeq
- )
-
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
@@ -701,47 +630,31 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
@ClusterTest
def testConsumerGroupHeartbeatFailureIfMemberIdMissingForVersionsAbove0():
Unit = {
- val admin = cluster.admin()
-
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
- try {
- TestUtils.createOffsetsTopicWithAdmin(
- admin = admin,
- brokers = cluster.brokers.values().asScala.toSeq,
- controllers = cluster.controllers().values().asScala.toSeq
- )
+ createOffsetsTopic()
- val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("grp")
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(5 * 60 * 1000)
- .setSubscribedTopicNames(List("foo").asJava)
- .setTopicPartitions(List.empty.asJava)
- ).build()
+ val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
- var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
- TestUtils.waitUntilTrue(() => {
- consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
- consumerGroupHeartbeatResponse.data.errorCode ==
Errors.INVALID_REQUEST.code
- }, msg = "Should fail due to invalid member id.")
- } finally {
- admin.close()
- }
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode ==
Errors.INVALID_REQUEST.code
+ }, msg = "Should fail due to invalid member id.")
}
@ClusterTest
def testMemberIdGeneratedOnServerWhenApiVersionIs0(): Unit = {
- val admin = cluster.admin()
-
// Creates the __consumer_offsets topics because it won't be created
automatically
// in this test because it does not use FindCoordinator API.
- TestUtils.createOffsetsTopicWithAdmin(
- admin = admin,
- brokers = cluster.brokers.values().asScala.toSeq,
- controllers = cluster.controllers().values().asScala.toSeq
- )
+ createOffsetsTopic()
val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
@@ -761,6 +674,5 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
val memberId = consumerGroupHeartbeatResponse.data().memberId()
assertNotNull(memberId)
assertFalse(memberId.isEmpty)
- admin.close()
}
}