This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 078d34f39d7 KAFKA-17910 Create integration tests for Admin.listGroups
and Admin.describeClassicGroups (#17712)
078d34f39d7 is described below
commit 078d34f39d73bfd4670254565f41947fdad57544
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Nov 18 16:35:48 2024 +0800
KAFKA-17910 Create integration tests for Admin.listGroups and
Admin.describeClassicGroups (#17712)
Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/api/PlaintextAdminIntegrationTest.scala | 129 +++++++++++++++++
.../org/apache/kafka/tools/GroupsCommandTest.java | 158 +++++++++++++++++++++
2 files changed, 287 insertions(+)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 7e1103e2b42..1eba4dba0db 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2087,6 +2087,135 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft+kip932"))
+ def testListGroups(quorum: String): Unit = {
+ val classicGroupId = "classic_group_id"
+ val consumerGroupId = "consumer_group_id"
+ val shareGroupId = "share_group_id"
+ val simpleGroupId = "simple_group_id"
+ val testTopicName = "test_topic"
+
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name)
+ val classicGroupConfig = new Properties(consumerConfig)
+ classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId)
+ val classicGroup = createConsumer(configOverrides = classicGroupConfig)
+
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name)
+ val consumerGroupConfig = new Properties(consumerConfig)
+ consumerGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId)
+ val consumerGroup = createConsumer(configOverrides = consumerGroupConfig)
+
+ val shareGroupConfig = new Properties(consumerConfig)
+ shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId)
+ val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
+
+ val config = createConfig
+ client = Admin.create(config)
+ try {
+ client.createTopics(Collections.singleton(
+ new NewTopic(testTopicName, 1, 1.toShort)
+ )).all().get()
+ waitForTopics(client, List(testTopicName), List())
+ val topicPartition = new TopicPartition(testTopicName, 0)
+
+ classicGroup.subscribe(Collections.singleton(testTopicName))
+ classicGroup.poll(JDuration.ofMillis(1000))
+ consumerGroup.subscribe(Collections.singleton(testTopicName))
+ consumerGroup.poll(JDuration.ofMillis(1000))
+ shareGroup.subscribe(Collections.singleton(testTopicName))
+ shareGroup.poll(JDuration.ofMillis(1000))
+
+ val alterConsumerGroupOffsetsResult =
client.alterConsumerGroupOffsets(simpleGroupId,
+ Collections.singletonMap(topicPartition, new OffsetAndMetadata(0L)))
+ assertNull(alterConsumerGroupOffsetsResult.all().get())
+
assertNull(alterConsumerGroupOffsetsResult.partitionResult(topicPartition).get())
+
+ TestUtils.waitUntilTrue(() => {
+ val groups = client.listGroups().all().get()
+ groups.size() == 4
+ }, "Expected to find all groups")
+
+ val classicGroupListing = new GroupListing(classicGroupId,
Optional.of(GroupType.CLASSIC), "consumer")
+ val consumerGroupListing = new GroupListing(consumerGroupId,
Optional.of(GroupType.CONSUMER), "consumer")
+ val shareGroupListing = new GroupListing(shareGroupId,
Optional.of(GroupType.SHARE), "share")
+ val simpleGroupListing = new GroupListing(simpleGroupId,
Optional.of(GroupType.CLASSIC), "")
+
+ var listGroupsResult = client.listGroups()
+ assertTrue(listGroupsResult.errors().get().isEmpty)
+ assertEquals(Set(classicGroupListing, simpleGroupListing,
consumerGroupListing, shareGroupListing),
listGroupsResult.all().get().asScala.toSet)
+ assertEquals(Set(classicGroupListing, simpleGroupListing,
consumerGroupListing, shareGroupListing),
listGroupsResult.valid().get().asScala.toSet)
+
+ listGroupsResult = client.listGroups(new
ListGroupsOptions().withTypes(java.util.Set.of(GroupType.CLASSIC)))
+ assertTrue(listGroupsResult.errors().get().isEmpty)
+ assertEquals(Set(classicGroupListing, simpleGroupListing),
listGroupsResult.all().get().asScala.toSet)
+ assertEquals(Set(classicGroupListing, simpleGroupListing),
listGroupsResult.valid().get().asScala.toSet)
+
+ listGroupsResult = client.listGroups(new
ListGroupsOptions().withTypes(java.util.Set.of(GroupType.CONSUMER)))
+ assertTrue(listGroupsResult.errors().get().isEmpty)
+ assertEquals(Set(consumerGroupListing),
listGroupsResult.all().get().asScala.toSet)
+ assertEquals(Set(consumerGroupListing),
listGroupsResult.valid().get().asScala.toSet)
+
+ listGroupsResult = client.listGroups(new
ListGroupsOptions().withTypes(java.util.Set.of(GroupType.SHARE)))
+ assertTrue(listGroupsResult.errors().get().isEmpty)
+ assertEquals(Set(shareGroupListing),
listGroupsResult.all().get().asScala.toSet)
+ assertEquals(Set(shareGroupListing),
listGroupsResult.valid().get().asScala.toSet)
+ } finally {
+ Utils.closeQuietly(classicGroup, "classicGroup")
+ Utils.closeQuietly(consumerGroup, "consumerGroup")
+ Utils.closeQuietly(shareGroup, "shareGroup")
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testDescribeClassicGroups(quorum: String, groupProtocol: String): Unit =
{
+ val classicGroupId = "classic_group_id"
+ val simpleGroupId = "simple_group_id"
+ val testTopicName = "test_topic"
+
+ val classicGroupConfig = new Properties(consumerConfig)
+ classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId)
+ val classicGroup = createConsumer(configOverrides = classicGroupConfig)
+
+ val config = createConfig
+ client = Admin.create(config)
+ try {
+ client.createTopics(Collections.singleton(
+ new NewTopic(testTopicName, 1, 1.toShort)
+ )).all().get()
+ waitForTopics(client, List(testTopicName), List())
+ val topicPartition = new TopicPartition(testTopicName, 0)
+
+ classicGroup.subscribe(Collections.singleton(testTopicName))
+ classicGroup.poll(JDuration.ofMillis(1000))
+
+ val alterConsumerGroupOffsetsResult =
client.alterConsumerGroupOffsets(simpleGroupId,
+ Collections.singletonMap(topicPartition, new OffsetAndMetadata(0L)))
+ assertNull(alterConsumerGroupOffsetsResult.all().get())
+
assertNull(alterConsumerGroupOffsetsResult.partitionResult(topicPartition).get())
+
+ val groupIds = Seq(simpleGroupId, classicGroupId)
+ TestUtils.waitUntilTrue(() => {
+ val groups =
client.describeClassicGroups(groupIds.asJavaCollection).all().get()
+ groups.size() == 2
+ }, "Expected to find all groups")
+
+ val classicConsumers =
client.describeClassicGroups(groupIds.asJavaCollection).all().get()
+ assertNotNull(classicConsumers.get(classicGroupId))
+ assertEquals(classicGroupId,
classicConsumers.get(classicGroupId).groupId())
+ assertEquals("consumer", classicConsumers.get(classicGroupId).protocol())
+
+ assertNotNull(classicConsumers.get(simpleGroupId))
+ assertEquals(simpleGroupId,
classicConsumers.get(simpleGroupId).groupId())
+ assertTrue(classicConsumers.get(simpleGroupId).protocol().isEmpty)
+ } finally {
+ Utils.closeQuietly(classicGroup, "classicGroup")
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
+
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip932"))
def testShareGroups(quorum: String): Unit = {
diff --git a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
index cdea5583653..b5a884d9e49 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
@@ -17,27 +17,52 @@
package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import java.util.concurrent.ExecutionException;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+@Timeout(value = 60)
+@ExtendWith(ClusterTestExtensions.class)
public class GroupsCommandTest {
private final String bootstrapServer = "localhost:9092";
@@ -357,6 +382,122 @@ public class GroupsCommandTest {
)));
}
+ @SuppressWarnings("NPathComplexity")
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer,share"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ }
+ )
+ public void testGroupCommand(ClusterInstance clusterInstance) throws
Exception {
+ String topic = "topic";
+ String classicGroupId = "classic_group";
+ String consumerGroupId = "consumer_group";
+ String shareGroupId = "share_group";
+ String simpleGroupId = "simple_group";
+ clusterInstance.createTopic("topic", 1, (short) 1);
+ TopicPartition topicPartition = new TopicPartition(topic, 0);
+
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers());
+
+ try (KafkaConsumer<String, String> classicGroup =
createKafkaConsumer(clusterInstance, classicGroupId, GroupProtocol.CLASSIC);
+ KafkaConsumer<String, String> consumerGroup =
createKafkaConsumer(clusterInstance, consumerGroupId, GroupProtocol.CONSUMER);
+ KafkaShareConsumer<String, String> shareGroup =
createKafkaShareConsumer(clusterInstance, shareGroupId);
+ Admin admin = clusterInstance.admin();
+ GroupsCommand.GroupsService groupsCommand = new
GroupsCommand.GroupsService(props)
+ ) {
+ classicGroup.subscribe(List.of(topic));
+ classicGroup.poll(Duration.ofMillis(1000));
+ consumerGroup.subscribe(List.of(topic));
+ consumerGroup.poll(Duration.ofMillis(1000));
+ shareGroup.subscribe(List.of(topic));
+ shareGroup.poll(Duration.ofMillis(1000));
+
+ AlterConsumerGroupOffsetsResult result =
admin.alterConsumerGroupOffsets(simpleGroupId, Map.of(topicPartition, new
OffsetAndMetadata(0L)));
+ assertNull(result.all().get());
+
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list").toArray(new String[0])))));
+ if (res.getKey().split("\n").length == 5 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{classicGroupId, "Classic", "consumer"},
+ new String[]{consumerGroupId, "Consumer", "consumer"},
+ new String[]{simpleGroupId, "Classic"},
+ new String[]{shareGroupId, "Share", "share"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return all groups");
+
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list", "--consumer").toArray(new
String[0])))));
+ if (res.getKey().split("\n").length == 4 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{classicGroupId, "Classic", "consumer"},
+ new String[]{consumerGroupId, "Consumer", "consumer"},
+ new String[]{simpleGroupId, "Classic"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return consumer protocol
groups");
+
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list", "--group-type",
"classic").toArray(new String[0])))));
+ if (res.getKey().split("\n").length == 3 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{classicGroupId, "Classic", "consumer"},
+ new String[]{simpleGroupId, "Classic"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return classic type groups");
+
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list", "--group-type",
"consumer").toArray(new String[0])))));
+ if (res.getKey().split("\n").length == 2 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{consumerGroupId, "Consumer", "consumer"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return consumer type groups");
+
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list", "--group-type",
"share").toArray(new String[0])))));
+ if (res.getKey().split("\n").length == 2 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{shareGroupId, "Share", "share"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return share type groups");
+
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list", "--share").toArray(new
String[0])))));
+ if (res.getKey().split("\n").length == 2 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{shareGroupId, "Share", "share"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return share type groups");
+ }
+ }
+
private void assertInitializeInvalidOptionsExitCode(int expected, String[]
options) {
Exit.setExitProcedure((exitCode, message) -> {
assertEquals(expected, exitCode);
@@ -378,4 +519,21 @@ public class GroupsCommandTest {
assertEquals(String.join(",", line), String.join(",",
capturedLines[i++].split(" +")));
}
}
+
+ private KafkaConsumer<String, String> createKafkaConsumer(ClusterInstance
clusterInstance, String groupId, GroupProtocol groupProtocol) {
+ return new KafkaConsumer<>(Map.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers(),
+ ConsumerConfig.GROUP_ID_CONFIG, groupId,
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name,
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName(),
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName()));
+ }
+
+ private KafkaShareConsumer<String, String>
createKafkaShareConsumer(ClusterInstance clusterInstance, String groupId) {
+ return new KafkaShareConsumer<>(Map.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers(),
+ ConsumerConfig.GROUP_ID_CONFIG, groupId,
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName(),
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName()));
+ }
}