This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 078d34f39d7 KAFKA-17910 Create integration tests for Admin.listGroups 
and Admin.describeClassicGroups (#17712)
078d34f39d7 is described below

commit 078d34f39d73bfd4670254565f41947fdad57544
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Nov 18 16:35:48 2024 +0800

    KAFKA-17910 Create integration tests for Admin.listGroups and 
Admin.describeClassicGroups (#17712)
    
    Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 129 +++++++++++++++++
 .../org/apache/kafka/tools/GroupsCommandTest.java  | 158 +++++++++++++++++++++
 2 files changed, 287 insertions(+)

diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 7e1103e2b42..1eba4dba0db 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2087,6 +2087,135 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testListGroups(quorum: String): Unit = {
+    val classicGroupId = "classic_group_id"
+    val consumerGroupId = "consumer_group_id"
+    val shareGroupId = "share_group_id"
+    val simpleGroupId = "simple_group_id"
+    val testTopicName = "test_topic"
+
+    consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name)
+    val classicGroupConfig = new Properties(consumerConfig)
+    classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId)
+    val classicGroup = createConsumer(configOverrides = classicGroupConfig)
+
+    consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name)
+    val consumerGroupConfig = new Properties(consumerConfig)
+    consumerGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId)
+    val consumerGroup = createConsumer(configOverrides = consumerGroupConfig)
+
+    val shareGroupConfig = new Properties(consumerConfig)
+    shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId)
+    val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
+
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      client.createTopics(Collections.singleton(
+        new NewTopic(testTopicName, 1, 1.toShort)
+      )).all().get()
+      waitForTopics(client, List(testTopicName), List())
+      val topicPartition = new TopicPartition(testTopicName, 0)
+
+      classicGroup.subscribe(Collections.singleton(testTopicName))
+      classicGroup.poll(JDuration.ofMillis(1000))
+      consumerGroup.subscribe(Collections.singleton(testTopicName))
+      consumerGroup.poll(JDuration.ofMillis(1000))
+      shareGroup.subscribe(Collections.singleton(testTopicName))
+      shareGroup.poll(JDuration.ofMillis(1000))
+
+      val alterConsumerGroupOffsetsResult = 
client.alterConsumerGroupOffsets(simpleGroupId,
+        Collections.singletonMap(topicPartition, new OffsetAndMetadata(0L)))
+      assertNull(alterConsumerGroupOffsetsResult.all().get())
+      
assertNull(alterConsumerGroupOffsetsResult.partitionResult(topicPartition).get())
+
+      TestUtils.waitUntilTrue(() => {
+        val groups = client.listGroups().all().get()
+        groups.size() == 4
+      }, "Expected to find all groups")
+
+      val classicGroupListing = new GroupListing(classicGroupId, 
Optional.of(GroupType.CLASSIC), "consumer")
+      val consumerGroupListing = new GroupListing(consumerGroupId, 
Optional.of(GroupType.CONSUMER), "consumer")
+      val shareGroupListing = new GroupListing(shareGroupId, 
Optional.of(GroupType.SHARE), "share")
+      val simpleGroupListing = new GroupListing(simpleGroupId, 
Optional.of(GroupType.CLASSIC), "")
+
+      var listGroupsResult = client.listGroups()
+      assertTrue(listGroupsResult.errors().get().isEmpty)
+      assertEquals(Set(classicGroupListing, simpleGroupListing, 
consumerGroupListing, shareGroupListing), 
listGroupsResult.all().get().asScala.toSet)
+      assertEquals(Set(classicGroupListing, simpleGroupListing, 
consumerGroupListing, shareGroupListing), 
listGroupsResult.valid().get().asScala.toSet)
+
+      listGroupsResult = client.listGroups(new 
ListGroupsOptions().withTypes(java.util.Set.of(GroupType.CLASSIC)))
+      assertTrue(listGroupsResult.errors().get().isEmpty)
+      assertEquals(Set(classicGroupListing, simpleGroupListing), 
listGroupsResult.all().get().asScala.toSet)
+      assertEquals(Set(classicGroupListing, simpleGroupListing), 
listGroupsResult.valid().get().asScala.toSet)
+
+      listGroupsResult = client.listGroups(new 
ListGroupsOptions().withTypes(java.util.Set.of(GroupType.CONSUMER)))
+      assertTrue(listGroupsResult.errors().get().isEmpty)
+      assertEquals(Set(consumerGroupListing), 
listGroupsResult.all().get().asScala.toSet)
+      assertEquals(Set(consumerGroupListing), 
listGroupsResult.valid().get().asScala.toSet)
+
+      listGroupsResult = client.listGroups(new 
ListGroupsOptions().withTypes(java.util.Set.of(GroupType.SHARE)))
+      assertTrue(listGroupsResult.errors().get().isEmpty)
+      assertEquals(Set(shareGroupListing), 
listGroupsResult.all().get().asScala.toSet)
+      assertEquals(Set(shareGroupListing), 
listGroupsResult.valid().get().asScala.toSet)
+    } finally {
+      Utils.closeQuietly(classicGroup, "classicGroup")
+      Utils.closeQuietly(consumerGroup, "consumerGroup")
+      Utils.closeQuietly(shareGroup, "shareGroup")
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  def testDescribeClassicGroups(quorum: String, groupProtocol: String): Unit = 
{
+    val classicGroupId = "classic_group_id"
+    val simpleGroupId = "simple_group_id"
+    val testTopicName = "test_topic"
+
+    val classicGroupConfig = new Properties(consumerConfig)
+    classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId)
+    val classicGroup = createConsumer(configOverrides = classicGroupConfig)
+
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      client.createTopics(Collections.singleton(
+        new NewTopic(testTopicName, 1, 1.toShort)
+      )).all().get()
+      waitForTopics(client, List(testTopicName), List())
+      val topicPartition = new TopicPartition(testTopicName, 0)
+
+      classicGroup.subscribe(Collections.singleton(testTopicName))
+      classicGroup.poll(JDuration.ofMillis(1000))
+
+      val alterConsumerGroupOffsetsResult = 
client.alterConsumerGroupOffsets(simpleGroupId,
+        Collections.singletonMap(topicPartition, new OffsetAndMetadata(0L)))
+      assertNull(alterConsumerGroupOffsetsResult.all().get())
+      
assertNull(alterConsumerGroupOffsetsResult.partitionResult(topicPartition).get())
+
+      val groupIds = Seq(simpleGroupId, classicGroupId)
+      TestUtils.waitUntilTrue(() => {
+        val groups = 
client.describeClassicGroups(groupIds.asJavaCollection).all().get()
+        groups.size() == 2
+      }, "Expected to find all groups")
+
+      val classicConsumers = 
client.describeClassicGroups(groupIds.asJavaCollection).all().get()
+      assertNotNull(classicConsumers.get(classicGroupId))
+      assertEquals(classicGroupId, 
classicConsumers.get(classicGroupId).groupId())
+      assertEquals("consumer", classicConsumers.get(classicGroupId).protocol())
+
+      assertNotNull(classicConsumers.get(simpleGroupId))
+      assertEquals(simpleGroupId, 
classicConsumers.get(simpleGroupId).groupId())
+      assertTrue(classicConsumers.get(simpleGroupId).protocol().isEmpty)
+    } finally {
+      Utils.closeQuietly(classicGroup, "classicGroup")
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
+
   @ParameterizedTest
   @ValueSource(strings = Array("kraft+kip932"))
   def testShareGroups(quorum: String): Unit = {
diff --git a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
index cdea5583653..b5a884d9e49 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
@@ -17,27 +17,52 @@
 package org.apache.kafka.tools;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
 import org.apache.kafka.clients.admin.GroupListing;
 import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestExtensions;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+@Timeout(value = 60)
+@ExtendWith(ClusterTestExtensions.class)
 public class GroupsCommandTest {
 
     private final String bootstrapServer = "localhost:9092";
@@ -357,6 +382,122 @@ public class GroupsCommandTest {
         )));
     }
 
+    @SuppressWarnings("NPathComplexity")
+    @ClusterTest(
+        serverProperties = {
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer,share"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+        }
+    )
+    public void testGroupCommand(ClusterInstance clusterInstance) throws 
Exception {
+        String topic = "topic";
+        String classicGroupId = "classic_group";
+        String consumerGroupId = "consumer_group";
+        String shareGroupId = "share_group";
+        String simpleGroupId = "simple_group";
+        clusterInstance.createTopic("topic", 1, (short) 1);
+        TopicPartition topicPartition = new TopicPartition(topic, 0);
+
+        Properties props = new Properties();
+        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers());
+
+        try (KafkaConsumer<String, String> classicGroup = 
createKafkaConsumer(clusterInstance, classicGroupId, GroupProtocol.CLASSIC);
+             KafkaConsumer<String, String> consumerGroup = 
createKafkaConsumer(clusterInstance, consumerGroupId, GroupProtocol.CONSUMER);
+             KafkaShareConsumer<String, String> shareGroup = 
createKafkaShareConsumer(clusterInstance, shareGroupId);
+             Admin admin = clusterInstance.admin();
+             GroupsCommand.GroupsService groupsCommand = new 
GroupsCommand.GroupsService(props)
+        ) {
+            classicGroup.subscribe(List.of(topic));
+            classicGroup.poll(Duration.ofMillis(1000));
+            consumerGroup.subscribe(List.of(topic));
+            consumerGroup.poll(Duration.ofMillis(1000));
+            shareGroup.subscribe(List.of(topic));
+            shareGroup.poll(Duration.ofMillis(1000));
+
+            AlterConsumerGroupOffsetsResult result = 
admin.alterConsumerGroupOffsets(simpleGroupId, Map.of(topicPartition, new 
OffsetAndMetadata(0L)));
+            assertNull(result.all().get());
+
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list").toArray(new String[0])))));
+                if (res.getKey().split("\n").length == 5 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{classicGroupId, "Classic", "consumer"},
+                        new String[]{consumerGroupId, "Consumer", "consumer"},
+                        new String[]{simpleGroupId, "Classic"},
+                        new String[]{shareGroupId, "Share", "share"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return all groups");
+
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--consumer").toArray(new 
String[0])))));
+                if (res.getKey().split("\n").length == 4 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{classicGroupId, "Classic", "consumer"},
+                        new String[]{consumerGroupId, "Consumer", "consumer"},
+                        new String[]{simpleGroupId, "Classic"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return consumer protocol 
groups");
+
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--group-type", 
"classic").toArray(new String[0])))));
+                if (res.getKey().split("\n").length == 3 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{classicGroupId, "Classic", "consumer"},
+                        new String[]{simpleGroupId, "Classic"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return classic type groups");
+
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--group-type", 
"consumer").toArray(new String[0])))));
+                if (res.getKey().split("\n").length == 2 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{consumerGroupId, "Consumer", "consumer"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return consumer type groups");
+
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--group-type", 
"share").toArray(new String[0])))));
+                if (res.getKey().split("\n").length == 2 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{shareGroupId, "Share", "share"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return share type groups");
+
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--share").toArray(new 
String[0])))));
+                if (res.getKey().split("\n").length == 2 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{shareGroupId, "Share", "share"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return share type groups");
+        }
+    }
+
     private void assertInitializeInvalidOptionsExitCode(int expected, String[] 
options) {
         Exit.setExitProcedure((exitCode, message) -> {
             assertEquals(expected, exitCode);
@@ -378,4 +519,21 @@ public class GroupsCommandTest {
             assertEquals(String.join(",", line), String.join(",", 
capturedLines[i++].split(" +")));
         }
     }
+
+    private KafkaConsumer<String, String> createKafkaConsumer(ClusterInstance 
clusterInstance, String groupId, GroupProtocol groupProtocol) {
+        return new KafkaConsumer<>(Map.of(
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers(),
+            ConsumerConfig.GROUP_ID_CONFIG, groupId,
+            ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name,
+            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName(),
+            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName()));
+    }
+
+    private KafkaShareConsumer<String, String> 
createKafkaShareConsumer(ClusterInstance clusterInstance, String groupId) {
+        return new KafkaShareConsumer<>(Map.of(
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers(),
+            ConsumerConfig.GROUP_ID_CONFIG, groupId,
+            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName(),
+            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName()));
+    }
 }

Reply via email to