This is an automated email from the ASF dual-hosted git repository.

dengziming pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0912ca27e2a KRaft support for DescribeClusterRequestTest and 
DeleteConsumerGroupsTest (#14294)
0912ca27e2a is described below

commit 0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb
Author: Alyssa Huang <[email protected]>
AuthorDate: Mon Aug 28 23:47:22 2023 -0700

    KRaft support for DescribeClusterRequestTest and DeleteConsumerGroupsTest 
(#14294)
    
    Reviewers: dengziming <[email protected]>, mannoopj 
<[email protected]>
---
 .../kafka/admin/DeleteConsumerGroupsTest.scala     | 80 +++++++++++++---------
 .../kafka/server/DescribeClusterRequestTest.scala  | 35 +++++++---
 2 files changed, 70 insertions(+), 45 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala 
b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index 7f9d97b4a83..a5bdda2d025 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -17,24 +17,27 @@
 package kafka.admin
 
 import joptsimple.OptionException
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.common.errors.{GroupIdNotFoundException, 
GroupNotEmptyException}
 import org.apache.kafka.common.protocol.Errors
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
-  @Test
-  def testDeleteWithTopicOption(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteWithTopicOption(quorum: String): Unit = {
+    createOffsetsTopic()
     val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete", 
"--group", group, "--topic")
     assertThrows(classOf[OptionException], () => 
getConsumerGroupService(cgcArgs))
   }
 
-  @Test
-  def testDeleteCmdNonExistingGroup(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteCmdNonExistingGroup(quorum: String): Unit = {
+    createOffsetsTopic()
     val missingGroup = "missing.group"
 
     val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete", 
"--group", missingGroup)
@@ -45,9 +48,10 @@ class DeleteConsumerGroupsTest extends 
ConsumerGroupCommandTest {
       s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected 
while deleting consumer group")
   }
 
-  @Test
-  def testDeleteNonExistingGroup(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteNonExistingGroup(quorum: String): Unit = {
+    createOffsetsTopic()
     val missingGroup = "missing.group"
 
     // note the group to be deleted is a different (non-existing) group
@@ -59,9 +63,10 @@ class DeleteConsumerGroupsTest extends 
ConsumerGroupCommandTest {
       s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected 
while deleting consumer group")
   }
 
-  @Test
-  def testDeleteCmdNonEmptyGroup(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteCmdNonEmptyGroup(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run one consumer in the group
     addConsumerGroupExecutor(numConsumers = 1)
@@ -77,9 +82,10 @@ class DeleteConsumerGroupsTest extends 
ConsumerGroupCommandTest {
       s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while 
deleting consumer group. Output was: (${output})")
   }
 
-  @Test
-  def testDeleteNonEmptyGroup(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteNonEmptyGroup(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run one consumer in the group
     addConsumerGroupExecutor(numConsumers = 1)
@@ -97,9 +103,10 @@ class DeleteConsumerGroupsTest extends 
ConsumerGroupCommandTest {
       s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while 
deleting consumer group. Result was:(${result})")
   }
 
-  @Test
-  def testDeleteCmdEmptyGroup(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteCmdEmptyGroup(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run one consumer in the group
     val executor = addConsumerGroupExecutor(numConsumers = 1)
@@ -121,9 +128,10 @@ class DeleteConsumerGroupsTest extends 
ConsumerGroupCommandTest {
       s"The consumer group could not be deleted as expected")
   }
 
-  @Test
-  def testDeleteCmdAllGroups(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteCmdAllGroups(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // Create 3 groups with 1 consumer per each
     val groups =
@@ -158,9 +166,10 @@ class DeleteConsumerGroupsTest extends 
ConsumerGroupCommandTest {
     )
   }
 
-  @Test
-  def testDeleteEmptyGroup(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteEmptyGroup(quorum: String): Unit = {
+    createOffsetsTopic()
 
     // run one consumer in the group
     val executor = addConsumerGroupExecutor(numConsumers = 1)
@@ -182,9 +191,10 @@ class DeleteConsumerGroupsTest extends 
ConsumerGroupCommandTest {
       s"The consumer group could not be deleted as expected")
   }
 
-  @Test
-  def testDeleteCmdWithMixOfSuccessAndError(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteCmdWithMixOfSuccessAndError(quorum: String): Unit = {
+    createOffsetsTopic()
     val missingGroup = "missing.group"
 
     // run one consumer in the group
@@ -208,9 +218,10 @@ class DeleteConsumerGroupsTest extends 
ConsumerGroupCommandTest {
         output.contains(s"These consumer groups were deleted successfully: 
'$group'"), s"The consumer group deletion did not work as expected")
   }
 
-  @Test
-  def testDeleteWithMixOfSuccessAndError(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteWithMixOfSuccessAndError(quorum: String): Unit = {
+    createOffsetsTopic()
     val missingGroup = "missing.group"
 
     // run one consumer in the group
@@ -238,8 +249,9 @@ class DeleteConsumerGroupsTest extends 
ConsumerGroupCommandTest {
   }
 
 
-  @Test
-  def testDeleteWithUnrecognizedNewConsumerOption(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteWithUnrecognizedNewConsumerOption(quorum: String): Unit = {
     val cgcArgs = Array("--new-consumer", "--bootstrap-server", 
bootstrapServers(), "--delete", "--group", group)
     assertThrows(classOf[OptionException], () => 
getConsumerGroupService(cgcArgs))
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
index 222ff2d9e2c..7c260dae853 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
@@ -19,16 +19,18 @@ package kafka.server
 
 import java.lang.{Byte => JByte}
 import java.util.Properties
-
 import kafka.network.SocketServer
 import kafka.security.authorizer.AclEntry
+import kafka.utils.TestInfoUtils
 import org.apache.kafka.common.message.{DescribeClusterRequestData, 
DescribeClusterResponseData}
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{DescribeClusterRequest, 
DescribeClusterResponse}
 import org.apache.kafka.common.resource.ResourceType
 import org.apache.kafka.common.utils.Utils
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 
@@ -45,26 +47,33 @@ class DescribeClusterRequestTest extends BaseRequestTest {
     doSetup(testInfo, createOffsetsTopic = false)
   }
 
-  @Test
-  def testDescribeClusterRequestIncludingClusterAuthorizedOperations(): Unit = 
{
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeClusterRequestIncludingClusterAuthorizedOperations(quorum: 
String): Unit = {
     testDescribeClusterRequest(true)
   }
 
-  @Test
-  def testDescribeClusterRequestExcludingClusterAuthorizedOperations(): Unit = 
{
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeClusterRequestExcludingClusterAuthorizedOperations(quorum: 
String): Unit = {
     testDescribeClusterRequest(false)
   }
 
   def testDescribeClusterRequest(includeClusterAuthorizedOperations: Boolean): 
Unit = {
-    val expectedBrokers = servers.map { server =>
+    val expectedBrokers = brokers.map { server =>
       new DescribeClusterResponseData.DescribeClusterBroker()
         .setBrokerId(server.config.brokerId)
         .setHost("localhost")
         .setPort(server.socketServer.boundPort(listenerName))
         .setRack(server.config.rack.orNull)
     }.toSet
-    val expectedControllerId = 
servers.filter(_.kafkaController.isActive).last.config.brokerId
-    val expectedClusterId = servers.last.clusterId
+
+    var expectedControllerId = 0
+    if (!isKRaftTest()) {
+      // in KRaft mode DescribeClusterRequest will return a random broker id 
as the controllerId (KIP-590)
+      expectedControllerId = 
servers.filter(_.kafkaController.isActive).last.config.brokerId
+    }
+    val expectedClusterId = brokers.last.clusterId
 
     val expectedClusterAuthorizedOperations = if 
(includeClusterAuthorizedOperations) {
       Utils.to32BitField(
@@ -80,7 +89,11 @@ class DescribeClusterRequestTest extends BaseRequestTest {
         .build(version.toShort)
       val describeClusterResponse = 
sentDescribeClusterRequest(describeClusterRequest)
 
-      assertEquals(expectedControllerId, 
describeClusterResponse.data.controllerId)
+      if (isKRaftTest()) {
+        assertTrue(0 to brokerCount contains 
describeClusterResponse.data.controllerId)
+      } else {
+        assertEquals(expectedControllerId, 
describeClusterResponse.data.controllerId)
+      }
       assertEquals(expectedClusterId, describeClusterResponse.data.clusterId)
       assertEquals(expectedClusterAuthorizedOperations, 
describeClusterResponse.data.clusterAuthorizedOperations)
       assertEquals(expectedBrokers, 
describeClusterResponse.data.brokers.asScala.toSet)

Reply via email to