This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e075e90c3a MINOR: Deduplicate common test helpers in 
group-coordinator tests (#21047)
4e075e90c3a is described below

commit 4e075e90c3a19f78e4a26a83c5229518971476e1
Author: Aneesh Garg <[email protected]>
AuthorDate: Tue Dec 2 20:48:57 2025 +0530

    MINOR: Deduplicate common test helpers in group-coordinator tests (#21047)
    
    This PR adds a helper method to create topics using the raw admin client
    in  `GroupCoordinatorBaseRequestTest`. It also updates related tests to
    use the  existing `createOffsetsTopic` helper instead of manually
    duplicating topic-creation  logic in multiple places. No functional
    changes to test behavior.
    
    Reviewers: David Jacot <[email protected]>
---
 .../server/ConsumerGroupDescribeRequestTest.scala  | 256 ++++---
 .../server/ConsumerGroupHeartbeatRequestTest.scala | 740 +++++++++------------
 2 files changed, 451 insertions(+), 545 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index 0f55feccb46..17057345c5b 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -79,151 +79,145 @@ class ConsumerGroupDescribeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     // in this test because it does not use FindCoordinator API.
     createOffsetsTopic()
 
-    val admin = cluster.admin()
-    try {
-      val topicId = TestUtils.createTopicWithAdminRaw(
-        admin = admin,
-        topic = "foo",
-        numPartitions = 3
-      )
+    val topicId = createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
 
-      val timeoutMs = 5 * 60 * 1000
-      val clientId = "client-id"
-      val clientHost = "/127.0.0.1"
-      val authorizedOperationsInt = Utils.to32BitField(
-        AclEntry.supportedOperations(ResourceType.GROUP).asScala
-          .map(_.code.asInstanceOf[JByte]).asJava)
+    val timeoutMs = 5 * 60 * 1000
+    val clientId = "client-id"
+    val clientHost = "/127.0.0.1"
+    val authorizedOperationsInt = Utils.to32BitField(
+      AclEntry.supportedOperations(ResourceType.GROUP).asScala
+        .map(_.code.asInstanceOf[JByte]).asJava)
 
-      // Add first group with one member.
-      var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
-      TestUtils.waitUntilTrue(() => {
-        grp1Member1Response = consumerGroupHeartbeat(
-          groupId = "grp-1",
-          memberId = Uuid.randomUuid().toString,
-          rebalanceTimeoutMs = timeoutMs,
-          subscribedTopicNames = List("bar"),
-          topicPartitions = List.empty
-        )
-        grp1Member1Response.errorCode == Errors.NONE.code
-      }, msg = s"Could not join the group successfully. Last response 
$grp1Member1Response.")
-
-      // Add second group with two members. For the first member, we
-      // wait until it receives an assignment. We use 'range` in this
-      // case to validate the assignor selection logic.
-      var grp2Member1Response: ConsumerGroupHeartbeatResponseData = null
-      TestUtils.waitUntilTrue(() => {
-        grp2Member1Response = consumerGroupHeartbeat(
-          memberId = "member-1",
-          groupId = "grp-2",
-          serverAssignor = "range",
-          rebalanceTimeoutMs = timeoutMs,
-          subscribedTopicNames = List("foo"),
-          topicPartitions = List.empty
-        )
-        grp2Member1Response.assignment != null && 
!grp2Member1Response.assignment.topicPartitions.isEmpty
-      }, msg = s"Could not join the group successfully. Last response 
$grp2Member1Response.")
+    // Add first group with one member.
+    var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
+    TestUtils.waitUntilTrue(() => {
+      grp1Member1Response = consumerGroupHeartbeat(
+        groupId = "grp-1",
+        memberId = Uuid.randomUuid().toString,
+        rebalanceTimeoutMs = timeoutMs,
+        subscribedTopicNames = List("bar"),
+        topicPartitions = List.empty
+      )
+      grp1Member1Response.errorCode == Errors.NONE.code
+    }, msg = s"Could not join the group successfully. Last response 
$grp1Member1Response.")
 
-      val grp2Member2Response = consumerGroupHeartbeat(
-        memberId = "member-2",
+    // Add second group with two members. For the first member, we
+    // wait until it receives an assignment. We use 'range` in this
+    // case to validate the assignor selection logic.
+    var grp2Member1Response: ConsumerGroupHeartbeatResponseData = null
+    TestUtils.waitUntilTrue(() => {
+      grp2Member1Response = consumerGroupHeartbeat(
+        memberId = "member-1",
         groupId = "grp-2",
         serverAssignor = "range",
         rebalanceTimeoutMs = timeoutMs,
         subscribedTopicNames = List("foo"),
         topicPartitions = List.empty
       )
+      grp2Member1Response.assignment != null && 
!grp2Member1Response.assignment.topicPartitions.isEmpty
+    }, msg = s"Could not join the group successfully. Last response 
$grp2Member1Response.")
 
-      for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to 
ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
-        val expected = List(
-          new DescribedGroup()
-            .setGroupId("grp-1")
-            .setGroupState(ConsumerGroupState.STABLE.toString)
-            .setGroupEpoch(1)
-            .setAssignmentEpoch(1)
-            .setAssignorName("uniform")
-            .setAuthorizedOperations(authorizedOperationsInt)
-            .setMembers(List(
-              new ConsumerGroupDescribeResponseData.Member()
-                .setMemberId(grp1Member1Response.memberId)
-                .setMemberEpoch(grp1Member1Response.memberEpoch)
-                .setClientId(clientId)
-                .setClientHost(clientHost)
-                .setSubscribedTopicRegex("")
-                .setSubscribedTopicNames(List("bar").asJava)
-                .setMemberType(if (version == 0) -1.toByte else 1.toByte)
-            ).asJava),
-          new DescribedGroup()
-            .setGroupId("grp-2")
-            .setGroupState(ConsumerGroupState.RECONCILING.toString)
-            .setGroupEpoch(grp2Member2Response.memberEpoch)
-            .setAssignmentEpoch(grp2Member2Response.memberEpoch)
-            .setAssignorName("range")
-            .setAuthorizedOperations(authorizedOperationsInt)
-            .setMembers(List(
-              new ConsumerGroupDescribeResponseData.Member()
-                .setMemberId(grp2Member2Response.memberId)
-                .setMemberEpoch(grp2Member2Response.memberEpoch)
-                .setClientId(clientId)
-                .setClientHost(clientHost)
-                .setSubscribedTopicRegex("")
-                .setSubscribedTopicNames(List("foo").asJava)
-                .setAssignment(new Assignment())
-                .setTargetAssignment(new Assignment()
-                  .setTopicPartitions(List(
-                    new TopicPartitions()
-                      .setTopicId(topicId)
-                      .setTopicName("foo")
-                      .setPartitions(List[Integer](2).asJava)
-                  ).asJava))
-                .setMemberType(if (version == 0) -1.toByte else 1.toByte),
-              new ConsumerGroupDescribeResponseData.Member()
-                .setMemberId(grp2Member1Response.memberId)
-                .setMemberEpoch(grp2Member1Response.memberEpoch)
-                .setClientId(clientId)
-                .setClientHost(clientHost)
-                .setSubscribedTopicRegex("")
-                .setSubscribedTopicNames(List("foo").asJava)
-                .setAssignment(new Assignment()
-                  .setTopicPartitions(List(
-                    new TopicPartitions()
-                      .setTopicId(topicId)
-                      .setTopicName("foo")
-                      .setPartitions(List[Integer](0, 1, 2).asJava)
-                  ).asJava))
-                .setTargetAssignment(new Assignment()
-                  .setTopicPartitions(List(
-                    new TopicPartitions()
-                      .setTopicId(topicId)
-                      .setTopicName("foo")
-                      .setPartitions(List[Integer](0, 1).asJava)
-                  ).asJava))
-                .setMemberType(if (version == 0) -1.toByte else 1.toByte),
-            ).asJava),
-        )
+    val grp2Member2Response = consumerGroupHeartbeat(
+      memberId = "member-2",
+      groupId = "grp-2",
+      serverAssignor = "range",
+      rebalanceTimeoutMs = timeoutMs,
+      subscribedTopicNames = List("foo"),
+      topicPartitions = List.empty
+    )
 
-        val actual = consumerGroupDescribe(
-          groupIds = List("grp-1", "grp-2"),
-          includeAuthorizedOperations = true,
-          version = version.toShort,
-        )
+    for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to 
ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
+      val expected = List(
+        new DescribedGroup()
+          .setGroupId("grp-1")
+          .setGroupState(ConsumerGroupState.STABLE.toString)
+          .setGroupEpoch(1)
+          .setAssignmentEpoch(1)
+          .setAssignorName("uniform")
+          .setAuthorizedOperations(authorizedOperationsInt)
+          .setMembers(List(
+            new ConsumerGroupDescribeResponseData.Member()
+              .setMemberId(grp1Member1Response.memberId)
+              .setMemberEpoch(grp1Member1Response.memberEpoch)
+              .setClientId(clientId)
+              .setClientHost(clientHost)
+              .setSubscribedTopicRegex("")
+              .setSubscribedTopicNames(List("bar").asJava)
+              .setMemberType(if (version == 0) -1.toByte else 1.toByte)
+          ).asJava),
+        new DescribedGroup()
+          .setGroupId("grp-2")
+          .setGroupState(ConsumerGroupState.RECONCILING.toString)
+          .setGroupEpoch(grp2Member2Response.memberEpoch)
+          .setAssignmentEpoch(grp2Member2Response.memberEpoch)
+          .setAssignorName("range")
+          .setAuthorizedOperations(authorizedOperationsInt)
+          .setMembers(List(
+            new ConsumerGroupDescribeResponseData.Member()
+              .setMemberId(grp2Member2Response.memberId)
+              .setMemberEpoch(grp2Member2Response.memberEpoch)
+              .setClientId(clientId)
+              .setClientHost(clientHost)
+              .setSubscribedTopicRegex("")
+              .setSubscribedTopicNames(List("foo").asJava)
+              .setAssignment(new Assignment())
+              .setTargetAssignment(new Assignment()
+                .setTopicPartitions(List(
+                  new TopicPartitions()
+                    .setTopicId(topicId)
+                    .setTopicName("foo")
+                    .setPartitions(List[Integer](2).asJava)
+                ).asJava))
+              .setMemberType(if (version == 0) -1.toByte else 1.toByte),
+            new ConsumerGroupDescribeResponseData.Member()
+              .setMemberId(grp2Member1Response.memberId)
+              .setMemberEpoch(grp2Member1Response.memberEpoch)
+              .setClientId(clientId)
+              .setClientHost(clientHost)
+              .setSubscribedTopicRegex("")
+              .setSubscribedTopicNames(List("foo").asJava)
+              .setAssignment(new Assignment()
+                .setTopicPartitions(List(
+                  new TopicPartitions()
+                    .setTopicId(topicId)
+                    .setTopicName("foo")
+                    .setPartitions(List[Integer](0, 1, 2).asJava)
+                ).asJava))
+              .setTargetAssignment(new Assignment()
+                .setTopicPartitions(List(
+                  new TopicPartitions()
+                    .setTopicId(topicId)
+                    .setTopicName("foo")
+                    .setPartitions(List[Integer](0, 1).asJava)
+                ).asJava))
+              .setMemberType(if (version == 0) -1.toByte else 1.toByte),
+          ).asJava),
+      )
 
-        assertEquals(expected, actual)
+      val actual = consumerGroupDescribe(
+        groupIds = List("grp-1", "grp-2"),
+        includeAuthorizedOperations = true,
+        version = version.toShort,
+      )
 
-        val unknownGroupResponse = consumerGroupDescribe(
-          groupIds = List("grp-unknown"),
-          includeAuthorizedOperations = true,
-          version = version.toShort,
-        )
-        assertEquals(Errors.GROUP_ID_NOT_FOUND.code, 
unknownGroupResponse.head.errorCode())
+      assertEquals(expected, actual)
 
-        val emptyGroupResponse = consumerGroupDescribe(
-          groupIds = List(""),
-          includeAuthorizedOperations = true,
-          version = version.toShort,
-        )
-        assertEquals(Errors.INVALID_GROUP_ID.code, 
emptyGroupResponse.head.errorCode())
-      }
-    } finally {
-      admin.close()
+      val unknownGroupResponse = consumerGroupDescribe(
+        groupIds = List("grp-unknown"),
+        includeAuthorizedOperations = true,
+        version = version.toShort,
+      )
+      assertEquals(Errors.GROUP_ID_NOT_FOUND.code, 
unknownGroupResponse.head.errorCode())
+
+      val emptyGroupResponse = consumerGroupDescribe(
+        groupIds = List(""),
+        includeAuthorizedOperations = true,
+        version = version.toShort,
+      )
+      assertEquals(Errors.INVALID_GROUP_ID.code, 
emptyGroupResponse.head.errorCode())
     }
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 506d0007924..e5b4d7493c7 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -74,90 +74,79 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
 
   @ClusterTest
   def 
testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit 
= {
-    val admin = cluster.admin()
-    
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
-    try {
-      TestUtils.createOffsetsTopicWithAdmin(
-        admin = admin,
-        brokers = cluster.brokers.values().asScala.toSeq,
-        controllers = cluster.controllers().values().asScala.toSeq
-      )
+    createOffsetsTopic()
 
-      // Heartbeat request to join the group. Note that the member subscribes
-      // to an nonexistent topic.
-      var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setMemberId(Uuid.randomUuid.toString)
-          .setMemberEpoch(0)
-          .setRebalanceTimeoutMs(5 * 60 * 1000)
-          .setSubscribedTopicNames(List("foo").asJava)
-          .setTopicPartitions(List.empty.asJava)
-      ).build()
+    // Heartbeat request to join the group. Note that the member subscribes
+    // to an nonexistent topic.
+    var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(Uuid.randomUuid.toString)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
 
-      // Send the request until receiving a successful response. There is a 
delay
-      // here because the group coordinator is loaded in the background.
-      var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
-      TestUtils.waitUntilTrue(() => {
-        consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
-        consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
-      }, msg = s"Could not join the group successfully. Last response 
$consumerGroupHeartbeatResponse.")
+    // Send the request until receiving a successful response. There is a delay
+    // here because the group coordinator is loaded in the background.
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+    }, msg = s"Could not join the group successfully. Last response 
$consumerGroupHeartbeatResponse.")
 
-      // Verify the response.
-      assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
-      assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
-      assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), 
consumerGroupHeartbeatResponse.data.assignment)
+    // Verify the response.
+    assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+    assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), 
consumerGroupHeartbeatResponse.data.assignment)
 
-      // Create the topic.
-      val topicId = TestUtils.createTopicWithAdminRaw(
-        admin = admin,
-        topic = "foo",
-        numPartitions = 3
-      )
+    // Create the topic.
+    val topicId = createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
 
-      // Prepare the next heartbeat.
-      consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
-          .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
-      ).build()
+    // Prepare the next heartbeat.
+    consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+        .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+    ).build()
 
-      // This is the expected assignment.
-      val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
-        .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-          .setTopicId(topicId)
-          .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+    // This is the expected assignment.
+    val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
 
-      // Heartbeats until the partitions are assigned.
-      consumerGroupHeartbeatResponse = null
-      TestUtils.waitUntilTrue(() => {
-        consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
-        consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
-          consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
-      }, msg = s"Could not get partitions assigned. Last response 
$consumerGroupHeartbeatResponse.")
+    // Heartbeats until the partitions are assigned.
+    consumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+        consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+    }, msg = s"Could not get partitions assigned. Last response 
$consumerGroupHeartbeatResponse.")
 
-      // Verify the response.
-      assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
-      assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
+    // Verify the response.
+    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
 
-      // Leave the group.
-      consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
-          .setMemberEpoch(-1)
-      ).build()
+    // Leave the group.
+    consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+        .setMemberEpoch(-1)
+    ).build()
 
-      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+    consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
 
-      // Verify the response.
-      assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
-    } finally {
-      admin.close()
-    }
+    // Verify the response.
+    assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
   }
 
   @ClusterTest
@@ -166,14 +155,9 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
 
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
-    try {
-      TestUtils.createOffsetsTopicWithAdmin(
-        admin = admin,
-        brokers = cluster.brokers.values().asScala.toSeq,
-        controllers = cluster.controllers().values().asScala.toSeq
-      )
+    createOffsetsTopic()
 
-      // Heartbeat request to join the group. Note that the member subscribes
+    try {// Heartbeat request to join the group. Note that the member 
subscribes
       // to a nonexistent topic.
       var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
         new ConsumerGroupHeartbeatRequestData()
@@ -199,8 +183,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
       assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), 
consumerGroupHeartbeatResponse.data.assignment)
 
       // Create the topic.
-      val topicId = TestUtils.createTopicWithAdminRaw(
-        admin = admin,
+      val topicId = createTopic(
         topic = "foo",
         numPartitions = 3
       )
@@ -263,254 +246,214 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
 
   @ClusterTest
   def testConsumerGroupHeartbeatWithInvalidRegularExpression(): Unit = {
-    val admin = cluster.admin()
-
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
-    try {
-      TestUtils.createOffsetsTopicWithAdmin(
-        admin = admin,
-        brokers = cluster.brokers.values().asScala.toSeq,
-        controllers = cluster.controllers().values().asScala.toSeq
-      )
+    createOffsetsTopic()
 
-      // Heartbeat request to join the group. Note that the member subscribes
-      // to an nonexistent topic.
-      val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setMemberId(Uuid.randomUuid().toString)
-          .setMemberEpoch(0)
-          .setRebalanceTimeoutMs(5 * 60 * 1000)
-          .setSubscribedTopicRegex("[")
-          .setTopicPartitions(List.empty.asJava)
-      ).build()
+    // Heartbeat request to join the group. Note that the member subscribes
+    // to an nonexistent topic.
+    val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(Uuid.randomUuid().toString)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicRegex("[")
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
 
-      // Send the request until receiving a successful response. There is a 
delay
-      // here because the group coordinator is loaded in the background.
-      var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
-      TestUtils.waitUntilTrue(() => {
-        consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
-        consumerGroupHeartbeatResponse.data.errorCode == 
Errors.INVALID_REGULAR_EXPRESSION.code
-      }, msg = s"Did not receive the expected error. Last response 
$consumerGroupHeartbeatResponse.")
+    // Send the request until receiving a successful response. There is a delay
+    // here because the group coordinator is loaded in the background.
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == 
Errors.INVALID_REGULAR_EXPRESSION.code
+    }, msg = s"Did not receive the expected error. Last response 
$consumerGroupHeartbeatResponse.")
 
-      // Verify the response.
-      assertEquals(Errors.INVALID_REGULAR_EXPRESSION.code, 
consumerGroupHeartbeatResponse.data.errorCode)
-    } finally {
-      admin.close()
-    }
+    // Verify the response.
+    assertEquals(Errors.INVALID_REGULAR_EXPRESSION.code, 
consumerGroupHeartbeatResponse.data.errorCode)
   }
 
   @ClusterTest
   def testEmptyConsumerGroupId(): Unit = {
-    val admin = cluster.admin()
-
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
-    try {
-      TestUtils.createOffsetsTopicWithAdmin(
-        admin = admin,
-        brokers = cluster.brokers.values().asScala.toSeq,
-        controllers = cluster.controllers().values().asScala.toSeq
-      )
+    createOffsetsTopic()
 
-      // Heartbeat request to join the group. Note that the member subscribes
-      // to an nonexistent topic.
-      val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("")
-          .setMemberId(Uuid.randomUuid().toString)
-          .setMemberEpoch(0)
-          .setRebalanceTimeoutMs(5 * 60 * 1000)
-          .setSubscribedTopicNames(List("foo").asJava)
-          .setTopicPartitions(List.empty.asJava),
-        true
-      ).build()
+    // Heartbeat request to join the group. Note that the member subscribes
+    // to an nonexistent topic.
+    val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("")
+        .setMemberId(Uuid.randomUuid().toString)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava),
+      true
+    ).build()
 
-      // Send the request until receiving a successful response. There is a 
delay
-      // here because the group coordinator is loaded in the background.
-      var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
-      TestUtils.waitUntilTrue(() => {
-        consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
-        consumerGroupHeartbeatResponse.data.errorCode == 
Errors.INVALID_REQUEST.code
-      }, msg = s"Did not receive the expected error. Last response 
$consumerGroupHeartbeatResponse.")
+    // Send the request until receiving a successful response. There is a delay
+    // here because the group coordinator is loaded in the background.
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == 
Errors.INVALID_REQUEST.code
+    }, msg = s"Did not receive the expected error. Last response 
$consumerGroupHeartbeatResponse.")
 
-      // Verify the response.
-      assertEquals(Errors.INVALID_REQUEST.code, 
consumerGroupHeartbeatResponse.data.errorCode)
-      assertEquals("GroupId can't be empty.", 
consumerGroupHeartbeatResponse.data.errorMessage)
-    } finally {
-      admin.close()
-    }
+    // Verify the response.
+    assertEquals(Errors.INVALID_REQUEST.code, 
consumerGroupHeartbeatResponse.data.errorCode)
+    assertEquals("GroupId can't be empty.", 
consumerGroupHeartbeatResponse.data.errorMessage)
   }
 
   @ClusterTest
   def testConsumerGroupHeartbeatWithEmptySubscription(): Unit = {
-    val admin = cluster.admin()
-
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
-    try {
-      TestUtils.createOffsetsTopicWithAdmin(
-        admin = admin,
-        brokers = cluster.brokers.values().asScala.toSeq,
-        controllers = cluster.controllers().values().asScala.toSeq
-      )
+    createOffsetsTopic()
 
-      // Heartbeat request to join the group.
-      var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setMemberId(Uuid.randomUuid().toString)
-          .setMemberEpoch(0)
-          .setRebalanceTimeoutMs(5 * 60 * 1000)
-          .setSubscribedTopicRegex("")
-          .setTopicPartitions(List.empty.asJava)
-      ).build()
+    // Heartbeat request to join the group.
+    var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(Uuid.randomUuid().toString)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicRegex("")
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
 
-      // Send the request until receiving a successful response. There is a 
delay
-      // here because the group coordinator is loaded in the background.
-      var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
-      TestUtils.waitUntilTrue(() => {
-        consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
-        consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
-      }, msg = s"Did not receive the expected successful response. Last 
response $consumerGroupHeartbeatResponse.")
+    // Send the request until receiving a successful response. There is a delay
+    // here because the group coordinator is loaded in the background.
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+    }, msg = s"Did not receive the expected successful response. Last response 
$consumerGroupHeartbeatResponse.")
 
-      // Heartbeat request to join the group.
-      consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setMemberId(Uuid.randomUuid().toString)
-          .setMemberEpoch(0)
-          .setRebalanceTimeoutMs(5 * 60 * 1000)
-          .setSubscribedTopicNames(List.empty.asJava)
-          .setTopicPartitions(List.empty.asJava)
-      ).build()
+    // Heartbeat request to join the group.
+    consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(Uuid.randomUuid().toString)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List.empty.asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
 
-      // Send the request until receiving a successful response. There is a 
delay
-      // here because the group coordinator is loaded in the background.
-      consumerGroupHeartbeatResponse = null
-      TestUtils.waitUntilTrue(() => {
-        consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
-        consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
-      }, msg = s"Did not receive the expected successful response. Last 
response $consumerGroupHeartbeatResponse.")
-    } finally {
-      admin.close()
-    }
+    // Send the request until receiving a successful response. There is a delay
+    // here because the group coordinator is loaded in the background.
+    consumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+    }, msg = s"Did not receive the expected successful response. Last response 
$consumerGroupHeartbeatResponse.")
   }
 
   @ClusterTest
   def 
testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): 
Unit = {
-    val admin = cluster.admin()
-    try {
-      val instanceId = "instanceId"
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
 
-      // Creates the __consumer_offsets topics because it won't be created 
automatically
-      // in this test because it does not use FindCoordinator API.
-      TestUtils.createOffsetsTopicWithAdmin(
-        admin = admin,
-        brokers = cluster.brokers.values().asScala.toSeq,
-        controllers = cluster.controllers().values().asScala.toSeq
-      )
+    val instanceId = "instanceId"
 
-      // Heartbeat request so that a static member joins the group
-      var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setMemberId(Uuid.randomUuid.toString)
-          .setInstanceId(instanceId)
-          .setMemberEpoch(0)
-          .setRebalanceTimeoutMs(5 * 60 * 1000)
-          .setSubscribedTopicNames(List("foo").asJava)
-          .setTopicPartitions(List.empty.asJava)
-      ).build()
+    // Heartbeat request so that a static member joins the group
+    var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(Uuid.randomUuid.toString)
+        .setInstanceId(instanceId)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
 
-      // Send the request until receiving a successful response. There is a 
delay
-      // here because the group coordinator is loaded in the background.
-      var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
-      TestUtils.waitUntilTrue(() => {
-        consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
-        consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
-      }, msg = s"Static member could not join the group successfully. Last 
response $consumerGroupHeartbeatResponse.")
+    // Send the request until receiving a successful response. There is a delay
+    // here because the group coordinator is loaded in the background.
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+    }, msg = s"Static member could not join the group successfully. Last 
response $consumerGroupHeartbeatResponse.")
 
-      // Verify the response.
-      assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
-      assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
-      assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), 
consumerGroupHeartbeatResponse.data.assignment)
+    // Verify the response.
+    assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+    assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), 
consumerGroupHeartbeatResponse.data.assignment)
 
-      // Create the topic.
-      val topicId = TestUtils.createTopicWithAdminRaw(
-        admin = admin,
-        topic = "foo",
-        numPartitions = 3
-      )
+    // Create the topic.
+    val topicId = createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
 
-      // Prepare the next heartbeat.
-      consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setInstanceId(instanceId)
-          .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
-          .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
-      ).build()
+    // Prepare the next heartbeat.
+    consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setInstanceId(instanceId)
+        .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+        .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+    ).build()
 
-      // This is the expected assignment.
-      val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
-        .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-          .setTopicId(topicId)
-          .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+    // This is the expected assignment.
+    val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
 
-      // Heartbeats until the partitions are assigned.
-      consumerGroupHeartbeatResponse = null
-      TestUtils.waitUntilTrue(() => {
-        consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
-        consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
-          consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
-      }, msg = s"Static member could not get partitions assigned. Last 
response $consumerGroupHeartbeatResponse.")
+    // Heartbeats until the partitions are assigned.
+    consumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+        consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+    }, msg = s"Static member could not get partitions assigned. Last response 
$consumerGroupHeartbeatResponse.")
 
-      // Verify the response.
-      assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
-      assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
-      assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
+    // Verify the response.
+    assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
 
-      val oldMemberId = consumerGroupHeartbeatResponse.data.memberId
+    val oldMemberId = consumerGroupHeartbeatResponse.data.memberId
 
-      // Leave the group temporarily
-      consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setInstanceId(instanceId)
-          .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
-          .setMemberEpoch(-2)
-      ).build()
+    // Leave the group temporarily
+    consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setInstanceId(instanceId)
+        .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+        .setMemberEpoch(-2)
+    ).build()
 
-      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+    consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
 
-      // Verify the response.
-      assertEquals(-2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    // Verify the response.
+    assertEquals(-2, consumerGroupHeartbeatResponse.data.memberEpoch)
 
-      // Another static member replaces the above member. It gets the same 
assignments back
-      consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setMemberId(Uuid.randomUuid.toString)
-          .setInstanceId(instanceId)
-          .setMemberEpoch(0)
-          .setRebalanceTimeoutMs(5 * 60 * 1000)
-          .setSubscribedTopicNames(List("foo").asJava)
-          .setTopicPartitions(List.empty.asJava)
-      ).build()
+    // Another static member replaces the above member. It gets the same 
assignments back
+    consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(Uuid.randomUuid.toString)
+        .setInstanceId(instanceId)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
 
-      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+    consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
 
-      // Verify the response.
-      assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
-      assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
-      assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
-      // The 2 member IDs should be different
-      assertNotEquals(oldMemberId, 
consumerGroupHeartbeatResponse.data.memberId)
-    } finally {
-      admin.close()
-    }
+    // Verify the response.
+    assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
+    // The 2 member IDs should be different
+    assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
   }
 
   @ClusterTest(
@@ -520,109 +463,99 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
     )
   )
   def 
testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled():
 Unit = {
-    val admin = cluster.admin()
-    try {
-      val instanceId = "instanceId"
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
 
-      // Creates the __consumer_offsets topics because it won't be created 
automatically
-      // in this test because it does not use FindCoordinator API.
-      TestUtils.createOffsetsTopicWithAdmin(
-        admin = admin,
-        brokers = cluster.brokers.values().asScala.toSeq,
-        controllers = cluster.controllers().values().asScala.toSeq
-      )
+    val instanceId = "instanceId"
 
-      // Heartbeat request to join the group. Note that the member subscribes
-      // to an nonexistent topic.
-      var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setMemberId(Uuid.randomUuid.toString)
-          .setInstanceId(instanceId)
-          .setMemberEpoch(0)
-          .setRebalanceTimeoutMs(5 * 60 * 1000)
-          .setSubscribedTopicNames(List("foo").asJava)
-          .setTopicPartitions(List.empty.asJava)
-      ).build()
+    // Heartbeat request to join the group. Note that the member subscribes
+    // to an nonexistent topic.
+    var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(Uuid.randomUuid.toString)
+        .setInstanceId(instanceId)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
 
-      // Send the request until receiving a successful response. There is a 
delay
-      // here because the group coordinator is loaded in the background.
-      var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
-      TestUtils.waitUntilTrue(() => {
-        consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
-        consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
-      }, msg = s"Could not join the group successfully. Last response 
$consumerGroupHeartbeatResponse.")
+    // Send the request until receiving a successful response. There is a delay
+    // here because the group coordinator is loaded in the background.
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+    }, msg = s"Could not join the group successfully. Last response 
$consumerGroupHeartbeatResponse.")
 
-      // Verify the response.
-      assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
-      assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
-      assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), 
consumerGroupHeartbeatResponse.data.assignment)
+    // Verify the response.
+    assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+    assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), 
consumerGroupHeartbeatResponse.data.assignment)
 
-      // Create the topic.
-      val topicId = TestUtils.createTopicWithAdminRaw(
-        admin = admin,
-        topic = "foo",
-        numPartitions = 3
-      )
+    // Create the topic.
+    val topicId = createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
 
-      // Prepare the next heartbeat.
-      consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setInstanceId(instanceId)
-          .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
-          .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
-      ).build()
+    // Prepare the next heartbeat.
+    consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setInstanceId(instanceId)
+        .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+        .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+    ).build()
 
-      // This is the expected assignment.
-      val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
-        .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-          .setTopicId(topicId)
-          .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+    // This is the expected assignment.
+    val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
 
-      // Heartbeats until the partitions are assigned.
-      consumerGroupHeartbeatResponse = null
-      TestUtils.waitUntilTrue(() => {
-        consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
-        consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
-          consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
-      }, msg = s"Could not get partitions assigned. Last response 
$consumerGroupHeartbeatResponse.")
+    // Heartbeats until the partitions are assigned.
+    consumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+        consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+    }, msg = s"Could not get partitions assigned. Last response 
$consumerGroupHeartbeatResponse.")
 
-      // Verify the response.
-      assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
-      assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
+    // Verify the response.
+    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
 
-      // A new static member tries to join the group with an inuse instanceid.
-      consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setMemberId(Uuid.randomUuid.toString)
-          .setInstanceId(instanceId)
-          .setMemberEpoch(0)
-          .setRebalanceTimeoutMs(5 * 60 * 1000)
-          .setSubscribedTopicNames(List("foo").asJava)
-          .setTopicPartitions(List.empty.asJava)
-      ).build()
+    // A new static member tries to join the group with an inuse instanceid.
+    consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(Uuid.randomUuid.toString)
+        .setInstanceId(instanceId)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
 
-      // Validating that trying to join with an in-use instanceId would throw 
an UnreleasedInstanceIdException.
-      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
-      assertEquals(Errors.UNRELEASED_INSTANCE_ID.code, 
consumerGroupHeartbeatResponse.data.errorCode)
+    // Validating that trying to join with an in-use instanceId would throw an 
UnreleasedInstanceIdException.
+    consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+    assertEquals(Errors.UNRELEASED_INSTANCE_ID.code, 
consumerGroupHeartbeatResponse.data.errorCode)
 
-      // The new static member join group will keep failing with an 
UnreleasedInstanceIdException
-      // until eventually it gets through because the existing member will be 
kicked out
-      // because of not sending a heartbeat till session timeout expiry.
-      TestUtils.waitUntilTrue(() => {
-        consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
-        consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
-          consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
-      }, msg = s"Could not re-join the group successfully. Last response 
$consumerGroupHeartbeatResponse.")
+    // The new static member join group will keep failing with an 
UnreleasedInstanceIdException
+    // until eventually it gets through because the existing member will be 
kicked out
+    // because of not sending a heartbeat till session timeout expiry.
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+        consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+    }, msg = s"Could not re-join the group successfully. Last response 
$consumerGroupHeartbeatResponse.")
 
-      // Verify the response. The group epoch bumps upto 4 which eventually 
reflects in the new member epoch.
-      assertEquals(4, consumerGroupHeartbeatResponse.data.memberEpoch)
-      assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
-    } finally {
-      admin.close()
-    }
+    // Verify the response. The group epoch bumps upto 4 which eventually 
reflects in the new member epoch.
+    assertEquals(4, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
   }
 
   @ClusterTest(
@@ -632,19 +565,15 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
   )
   def testUpdateConsumerGroupHeartbeatConfigSuccessful(): Unit = {
     val admin = cluster.admin()
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
     try {
       val newHeartbeatIntervalMs = 10000
       val instanceId = "instanceId"
       val consumerGroupId = "grp"
 
-      // Creates the __consumer_offsets topics because it won't be created 
automatically
-      // in this test because it does not use FindCoordinator API.
-      TestUtils.createOffsetsTopicWithAdmin(
-        admin = admin,
-        brokers = cluster.brokers.values().asScala.toSeq,
-        controllers = cluster.controllers().values().asScala.toSeq
-      )
-
       // Heartbeat request to join the group. Note that the member subscribes
       // to an nonexistent topic.
       var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
@@ -701,47 +630,31 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
 
   @ClusterTest
   def testConsumerGroupHeartbeatFailureIfMemberIdMissingForVersionsAbove0(): 
Unit = {
-    val admin = cluster.admin()
-
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
-    try {
-      TestUtils.createOffsetsTopicWithAdmin(
-        admin = admin,
-        brokers = cluster.brokers.values().asScala.toSeq,
-        controllers = cluster.controllers().values().asScala.toSeq
-      )
+    createOffsetsTopic()
 
-      val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-        new ConsumerGroupHeartbeatRequestData()
-          .setGroupId("grp")
-          .setMemberEpoch(0)
-          .setRebalanceTimeoutMs(5 * 60 * 1000)
-          .setSubscribedTopicNames(List("foo").asJava)
-          .setTopicPartitions(List.empty.asJava)
-      ).build()
+    val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
 
-      var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
-      TestUtils.waitUntilTrue(() => {
-        consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
-        consumerGroupHeartbeatResponse.data.errorCode == 
Errors.INVALID_REQUEST.code
-      }, msg = "Should fail due to invalid member id.")
-    } finally {
-      admin.close()
-    }
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == 
Errors.INVALID_REQUEST.code
+    }, msg = "Should fail due to invalid member id.")
   }
 
   @ClusterTest
   def testMemberIdGeneratedOnServerWhenApiVersionIs0(): Unit = {
-    val admin = cluster.admin()
-
     // Creates the __consumer_offsets topics because it won't be created 
automatically
     // in this test because it does not use FindCoordinator API.
-    TestUtils.createOffsetsTopicWithAdmin(
-      admin = admin,
-      brokers = cluster.brokers.values().asScala.toSeq,
-      controllers = cluster.controllers().values().asScala.toSeq
-    )
+    createOffsetsTopic()
 
     val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
       new ConsumerGroupHeartbeatRequestData()
@@ -761,6 +674,5 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
     val memberId = consumerGroupHeartbeatResponse.data().memberId()
     assertNotNull(memberId)
     assertFalse(memberId.isEmpty)
-    admin.close()
   }
 }

Reply via email to