This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f2e1a4ca8bc KAFKA-17548: Improve group listing integration test 
(#18081)
f2e1a4ca8bc is described below

commit f2e1a4ca8bc0baec5c130f88865c0919ac7acfbd
Author: Andrew Schofield <[email protected]>
AuthorDate: Sat Dec 7 18:42:36 2024 +0000

    KAFKA-17548: Improve group listing integration test (#18081)
    
    Reviewers: Apoorv Mittal <[email protected]>, Manikumar Reddy 
<[email protected]>
---
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 44 ++++++++++++++++++----
 1 file changed, 37 insertions(+), 7 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index c53dfef850e..64d9cc94c2d 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2147,7 +2147,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
           TestUtils.waitUntilTrue(() => {
             val matching = 
client.listConsumerGroups.all.get.asScala.filter(group =>
               group.groupId == testGroupId &&
-                group.state.get == ConsumerGroupState.STABLE)
+                group.state.get == ConsumerGroupState.STABLE &&
+                group.groupState.get == GroupState.STABLE)
             matching.size == 1
           }, s"Expected to be able to list $testGroupId")
 
@@ -2155,7 +2156,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
             val options = new 
ListConsumerGroupsOptions().withTypes(Set(groupType).asJava)
             val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
               group.groupId == testGroupId &&
-                group.state.get == ConsumerGroupState.STABLE)
+                group.state.get == ConsumerGroupState.STABLE &&
+                group.groupState.get == GroupState.STABLE)
             matching.size == 1
           }, s"Expected to be able to list $testGroupId in group type 
$groupType")
 
@@ -2164,7 +2166,18 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
               .inStates(Set(ConsumerGroupState.STABLE).asJava)
             val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
               group.groupId == testGroupId &&
-                group.state.get == ConsumerGroupState.STABLE)
+                group.state.get == ConsumerGroupState.STABLE &&
+                group.groupState.get == GroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId in group type 
$groupType and state Stable")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListConsumerGroupsOptions().withTypes(Set(groupType).asJava)
+              .inGroupStates(Set(GroupState.STABLE).asJava)
+            val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
+              group.groupId == testGroupId &&
+                group.state.get == ConsumerGroupState.STABLE &&
+                group.groupState.get == GroupState.STABLE)
             matching.size == 1
           }, s"Expected to be able to list $testGroupId in group type 
$groupType and state Stable")
 
@@ -2172,7 +2185,17 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
             val options = new 
ListConsumerGroupsOptions().inStates(Set(ConsumerGroupState.STABLE).asJava)
             val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
               group.groupId == testGroupId &&
-                group.state.get == ConsumerGroupState.STABLE)
+                group.state.get == ConsumerGroupState.STABLE &&
+                group.groupState.get == GroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId in state Stable")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListConsumerGroupsOptions().inGroupStates(Set(GroupState.STABLE).asJava)
+            val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
+              group.groupId == testGroupId &&
+                group.state.get == ConsumerGroupState.STABLE &&
+                group.groupState.get == GroupState.STABLE)
             matching.size == 1
           }, s"Expected to be able to list $testGroupId in state Stable")
 
@@ -2183,6 +2206,13 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
             matching.isEmpty
           }, s"Expected to find zero groups")
 
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListConsumerGroupsOptions().inGroupStates(Set(GroupState.EMPTY).asJava)
+            val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(
+              _.groupId == testGroupId)
+            matching.isEmpty
+          }, s"Expected to find zero groups")
+
           val describeWithFakeGroupResult = 
client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
             new 
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
           assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
@@ -3440,8 +3470,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   @ValueSource(strings = Array("kraft"))
   def testLongTopicNames(quorum: String): Unit = {
     val client = createAdminClient
-    val longTopicName = String.join("", Collections.nCopies(249, "x"));
-    val invalidTopicName = String.join("", Collections.nCopies(250, "x"));
+    val longTopicName = String.join("", Collections.nCopies(249, "x"))
+    val invalidTopicName = String.join("", Collections.nCopies(250, "x"))
     val newTopics2 = Seq(new NewTopic(invalidTopicName, 3, 3.toShort),
                          new NewTopic(longTopicName, 3, 3.toShort))
     val results = client.createTopics(newTopics2.asJava).values()
@@ -3522,7 +3552,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   def testIncrementalAlterConfigsForLog4jLogLevels(quorum: String): Unit = {
     client = createAdminClient
 
-    val ancestorLogger = "kafka";
+    val ancestorLogger = "kafka"
     val initialLoggerConfig = describeBrokerLoggers()
     val initialAncestorLogLevel = initialLoggerConfig.get("kafka").value()
     val initialControllerServerLogLevel = 
initialLoggerConfig.get("kafka.server.ControllerServer").value()

Reply via email to