This is an automated email from the ASF dual-hosted git repository.
dengziming pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0912ca27e2a KRaft support for DescribeClusterRequestTest and
DeleteConsumerGroupsTest (#14294)
0912ca27e2a is described below
commit 0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb
Author: Alyssa Huang <[email protected]>
AuthorDate: Mon Aug 28 23:47:22 2023 -0700
KRaft support for DescribeClusterRequestTest and DeleteConsumerGroupsTest
(#14294)
Reviewers: dengziming <[email protected]>, mannoopj
<[email protected]>
---
.../kafka/admin/DeleteConsumerGroupsTest.scala | 80 +++++++++++++---------
.../kafka/server/DescribeClusterRequestTest.scala | 35 +++++++---
2 files changed, 70 insertions(+), 45 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index 7f9d97b4a83..a5bdda2d025 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -17,24 +17,27 @@
package kafka.admin
import joptsimple.OptionException
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.common.errors.{GroupIdNotFoundException,
GroupNotEmptyException}
import org.apache.kafka.common.protocol.Errors
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
- @Test
- def testDeleteWithTopicOption(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteWithTopicOption(quorum: String): Unit = {
+ createOffsetsTopic()
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete",
"--group", group, "--topic")
assertThrows(classOf[OptionException], () =>
getConsumerGroupService(cgcArgs))
}
- @Test
- def testDeleteCmdNonExistingGroup(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteCmdNonExistingGroup(quorum: String): Unit = {
+ createOffsetsTopic()
val missingGroup = "missing.group"
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete",
"--group", missingGroup)
@@ -45,9 +48,10 @@ class DeleteConsumerGroupsTest extends
ConsumerGroupCommandTest {
s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected
while deleting consumer group")
}
- @Test
- def testDeleteNonExistingGroup(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteNonExistingGroup(quorum: String): Unit = {
+ createOffsetsTopic()
val missingGroup = "missing.group"
// note the group to be deleted is a different (non-existing) group
@@ -59,9 +63,10 @@ class DeleteConsumerGroupsTest extends
ConsumerGroupCommandTest {
s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected
while deleting consumer group")
}
- @Test
- def testDeleteCmdNonEmptyGroup(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteCmdNonEmptyGroup(quorum: String): Unit = {
+ createOffsetsTopic()
// run one consumer in the group
addConsumerGroupExecutor(numConsumers = 1)
@@ -77,9 +82,10 @@ class DeleteConsumerGroupsTest extends
ConsumerGroupCommandTest {
s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while
deleting consumer group. Output was: (${output})")
}
- @Test
- def testDeleteNonEmptyGroup(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteNonEmptyGroup(quorum: String): Unit = {
+ createOffsetsTopic()
// run one consumer in the group
addConsumerGroupExecutor(numConsumers = 1)
@@ -97,9 +103,10 @@ class DeleteConsumerGroupsTest extends
ConsumerGroupCommandTest {
s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while
deleting consumer group. Result was:(${result})")
}
- @Test
- def testDeleteCmdEmptyGroup(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteCmdEmptyGroup(quorum: String): Unit = {
+ createOffsetsTopic()
// run one consumer in the group
val executor = addConsumerGroupExecutor(numConsumers = 1)
@@ -121,9 +128,10 @@ class DeleteConsumerGroupsTest extends
ConsumerGroupCommandTest {
s"The consumer group could not be deleted as expected")
}
- @Test
- def testDeleteCmdAllGroups(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteCmdAllGroups(quorum: String): Unit = {
+ createOffsetsTopic()
// Create 3 groups with 1 consumer per each
val groups =
@@ -158,9 +166,10 @@ class DeleteConsumerGroupsTest extends
ConsumerGroupCommandTest {
)
}
- @Test
- def testDeleteEmptyGroup(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteEmptyGroup(quorum: String): Unit = {
+ createOffsetsTopic()
// run one consumer in the group
val executor = addConsumerGroupExecutor(numConsumers = 1)
@@ -182,9 +191,10 @@ class DeleteConsumerGroupsTest extends
ConsumerGroupCommandTest {
s"The consumer group could not be deleted as expected")
}
- @Test
- def testDeleteCmdWithMixOfSuccessAndError(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteCmdWithMixOfSuccessAndError(quorum: String): Unit = {
+ createOffsetsTopic()
val missingGroup = "missing.group"
// run one consumer in the group
@@ -208,9 +218,10 @@ class DeleteConsumerGroupsTest extends
ConsumerGroupCommandTest {
output.contains(s"These consumer groups were deleted successfully:
'$group'"), s"The consumer group deletion did not work as expected")
}
- @Test
- def testDeleteWithMixOfSuccessAndError(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteWithMixOfSuccessAndError(quorum: String): Unit = {
+ createOffsetsTopic()
val missingGroup = "missing.group"
// run one consumer in the group
@@ -238,8 +249,9 @@ class DeleteConsumerGroupsTest extends
ConsumerGroupCommandTest {
}
- @Test
- def testDeleteWithUnrecognizedNewConsumerOption(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteWithUnrecognizedNewConsumerOption(quorum: String): Unit = {
val cgcArgs = Array("--new-consumer", "--bootstrap-server",
bootstrapServers(), "--delete", "--group", group)
assertThrows(classOf[OptionException], () =>
getConsumerGroupService(cgcArgs))
}
diff --git
a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
index 222ff2d9e2c..7c260dae853 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
@@ -19,16 +19,18 @@ package kafka.server
import java.lang.{Byte => JByte}
import java.util.Properties
-
import kafka.network.SocketServer
import kafka.security.authorizer.AclEntry
+import kafka.utils.TestInfoUtils
import org.apache.kafka.common.message.{DescribeClusterRequestData,
DescribeClusterResponseData}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{DescribeClusterRequest,
DescribeClusterResponse}
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.utils.Utils
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
@@ -45,26 +47,33 @@ class DescribeClusterRequestTest extends BaseRequestTest {
doSetup(testInfo, createOffsetsTopic = false)
}
- @Test
- def testDescribeClusterRequestIncludingClusterAuthorizedOperations(): Unit =
{
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeClusterRequestIncludingClusterAuthorizedOperations(quorum:
String): Unit = {
testDescribeClusterRequest(true)
}
- @Test
- def testDescribeClusterRequestExcludingClusterAuthorizedOperations(): Unit =
{
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeClusterRequestExcludingClusterAuthorizedOperations(quorum:
String): Unit = {
testDescribeClusterRequest(false)
}
def testDescribeClusterRequest(includeClusterAuthorizedOperations: Boolean):
Unit = {
- val expectedBrokers = servers.map { server =>
+ val expectedBrokers = brokers.map { server =>
new DescribeClusterResponseData.DescribeClusterBroker()
.setBrokerId(server.config.brokerId)
.setHost("localhost")
.setPort(server.socketServer.boundPort(listenerName))
.setRack(server.config.rack.orNull)
}.toSet
- val expectedControllerId =
servers.filter(_.kafkaController.isActive).last.config.brokerId
- val expectedClusterId = servers.last.clusterId
+
+ var expectedControllerId = 0
+ if (!isKRaftTest()) {
+ // in KRaft mode DescribeClusterRequest will return a random broker id
as the controllerId (KIP-590)
+ expectedControllerId =
servers.filter(_.kafkaController.isActive).last.config.brokerId
+ }
+ val expectedClusterId = brokers.last.clusterId
val expectedClusterAuthorizedOperations = if
(includeClusterAuthorizedOperations) {
Utils.to32BitField(
@@ -80,7 +89,11 @@ class DescribeClusterRequestTest extends BaseRequestTest {
.build(version.toShort)
val describeClusterResponse =
sentDescribeClusterRequest(describeClusterRequest)
- assertEquals(expectedControllerId,
describeClusterResponse.data.controllerId)
+ if (isKRaftTest()) {
+ assertTrue(0 to brokerCount contains
describeClusterResponse.data.controllerId)
+ } else {
+ assertEquals(expectedControllerId,
describeClusterResponse.data.controllerId)
+ }
assertEquals(expectedClusterId, describeClusterResponse.data.clusterId)
assertEquals(expectedClusterAuthorizedOperations,
describeClusterResponse.data.clusterAuthorizedOperations)
assertEquals(expectedBrokers,
describeClusterResponse.data.brokers.asScala.toSet)