This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f2e1a4ca8bc KAFKA-17548: Improve group listing integration test
(#18081)
f2e1a4ca8bc is described below
commit f2e1a4ca8bc0baec5c130f88865c0919ac7acfbd
Author: Andrew Schofield <[email protected]>
AuthorDate: Sat Dec 7 18:42:36 2024 +0000
KAFKA-17548: Improve group listing integration test (#18081)
Reviewers: Apoorv Mittal <[email protected]>, Manikumar Reddy
<[email protected]>
---
.../kafka/api/PlaintextAdminIntegrationTest.scala | 44 ++++++++++++++++++----
1 file changed, 37 insertions(+), 7 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index c53dfef850e..64d9cc94c2d 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2147,7 +2147,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
TestUtils.waitUntilTrue(() => {
val matching =
client.listConsumerGroups.all.get.asScala.filter(group =>
group.groupId == testGroupId &&
- group.state.get == ConsumerGroupState.STABLE)
+ group.state.get == ConsumerGroupState.STABLE &&
+ group.groupState.get == GroupState.STABLE)
matching.size == 1
}, s"Expected to be able to list $testGroupId")
@@ -2155,7 +2156,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val options = new
ListConsumerGroupsOptions().withTypes(Set(groupType).asJava)
val matching =
client.listConsumerGroups(options).all.get.asScala.filter(group =>
group.groupId == testGroupId &&
- group.state.get == ConsumerGroupState.STABLE)
+ group.state.get == ConsumerGroupState.STABLE &&
+ group.groupState.get == GroupState.STABLE)
matching.size == 1
}, s"Expected to be able to list $testGroupId in group type
$groupType")
@@ -2164,7 +2166,18 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
.inStates(Set(ConsumerGroupState.STABLE).asJava)
val matching =
client.listConsumerGroups(options).all.get.asScala.filter(group =>
group.groupId == testGroupId &&
- group.state.get == ConsumerGroupState.STABLE)
+ group.state.get == ConsumerGroupState.STABLE &&
+ group.groupState.get == GroupState.STABLE)
+ matching.size == 1
+ }, s"Expected to be able to list $testGroupId in group type
$groupType and state Stable")
+
+ TestUtils.waitUntilTrue(() => {
+ val options = new
ListConsumerGroupsOptions().withTypes(Set(groupType).asJava)
+ .inGroupStates(Set(GroupState.STABLE).asJava)
+ val matching =
client.listConsumerGroups(options).all.get.asScala.filter(group =>
+ group.groupId == testGroupId &&
+ group.state.get == ConsumerGroupState.STABLE &&
+ group.groupState.get == GroupState.STABLE)
matching.size == 1
}, s"Expected to be able to list $testGroupId in group type
$groupType and state Stable")
@@ -2172,7 +2185,17 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val options = new
ListConsumerGroupsOptions().inStates(Set(ConsumerGroupState.STABLE).asJava)
val matching =
client.listConsumerGroups(options).all.get.asScala.filter(group =>
group.groupId == testGroupId &&
- group.state.get == ConsumerGroupState.STABLE)
+ group.state.get == ConsumerGroupState.STABLE &&
+ group.groupState.get == GroupState.STABLE)
+ matching.size == 1
+ }, s"Expected to be able to list $testGroupId in state Stable")
+
+ TestUtils.waitUntilTrue(() => {
+ val options = new
ListConsumerGroupsOptions().inGroupStates(Set(GroupState.STABLE).asJava)
+ val matching =
client.listConsumerGroups(options).all.get.asScala.filter(group =>
+ group.groupId == testGroupId &&
+ group.state.get == ConsumerGroupState.STABLE &&
+ group.groupState.get == GroupState.STABLE)
matching.size == 1
}, s"Expected to be able to list $testGroupId in state Stable")
@@ -2183,6 +2206,13 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
matching.isEmpty
}, s"Expected to find zero groups")
+ TestUtils.waitUntilTrue(() => {
+ val options = new
ListConsumerGroupsOptions().inGroupStates(Set(GroupState.EMPTY).asJava)
+ val matching =
client.listConsumerGroups(options).all.get.asScala.filter(
+ _.groupId == testGroupId)
+ matching.isEmpty
+ }, s"Expected to find zero groups")
+
val describeWithFakeGroupResult =
client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
new
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
@@ -3440,8 +3470,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
@ValueSource(strings = Array("kraft"))
def testLongTopicNames(quorum: String): Unit = {
val client = createAdminClient
- val longTopicName = String.join("", Collections.nCopies(249, "x"));
- val invalidTopicName = String.join("", Collections.nCopies(250, "x"));
+ val longTopicName = String.join("", Collections.nCopies(249, "x"))
+ val invalidTopicName = String.join("", Collections.nCopies(250, "x"))
val newTopics2 = Seq(new NewTopic(invalidTopicName, 3, 3.toShort),
new NewTopic(longTopicName, 3, 3.toShort))
val results = client.createTopics(newTopics2.asJava).values()
@@ -3522,7 +3552,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
def testIncrementalAlterConfigsForLog4jLogLevels(quorum: String): Unit = {
client = createAdminClient
- val ancestorLogger = "kafka";
+ val ancestorLogger = "kafka"
val initialLoggerConfig = describeBrokerLoggers()
val initialAncestorLogLevel = initialLoggerConfig.get("kafka").value()
val initialControllerServerLogLevel =
initialLoggerConfig.get("kafka.server.ControllerServer").value()