This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a81f08d368a KAFKA-19550: Integration test for Streams-related Admin APIs [1/N] (#20244) a81f08d368a is described below commit a81f08d368afccd681fc44010c57d5395669d60c Author: lucliu1108 <luc...@confluent.io> AuthorDate: Thu Sep 4 08:09:21 2025 -0500 KAFKA-19550: Integration test for Streams-related Admin APIs [1/N] (#20244) This change adds: - Integration test for `Admin#describeStreamsGroups` API - Integration test for `Admin#deleteStreamsGroup` API Reviewers: Alieh Saeedi <asae...@confluent.io>, Lucas Brutschy <lucas...@apache.org> --------- Co-authored-by: Lucas Brutschy <lbruts...@gmail.com> --- .../kafka/api/IntegrationTestHarness.scala | 52 ++++++- .../kafka/api/PlaintextAdminIntegrationTest.scala | 168 ++++++++++++++++++++- 2 files changed, 213 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 7c08dd9c3fe..fe24e45f16b 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -22,13 +22,14 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsume import kafka.utils.TestUtils import kafka.utils.Implicits._ -import java.util.{Optional, Properties} +import java.util +import java.util.{Optional, Properties, UUID} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} import kafka.server.KafkaConfig import kafka.integration.KafkaServerTestHarness import kafka.security.JaasTestUtils import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} -import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData} +import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData, StreamsRebalanceListener} import org.apache.kafka.common.network.{ConnectionMode, ListenerName} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer} import org.apache.kafka.common.utils.Utils @@ -39,6 +40,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import scala.collection.mutable import scala.collection.Seq +import scala.jdk.CollectionConverters._ import scala.jdk.javaapi.OptionConverters /** @@ -235,6 +237,52 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { streamsConsumer } + def createStreamsGroup[K, V](configOverrides: Properties = new Properties, + configsToRemove: List[String] = List(), + inputTopic: String, + streamsGroupId: String): AsyncKafkaConsumer[K, V] = { + val props = new Properties() + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + props.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) + props ++= configOverrides + configsToRemove.foreach(props.remove(_)) + + val streamsRebalanceData = new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + util.Map.of( + "subtopology-0", new StreamsRebalanceData.Subtopology( + util.Set.of(inputTopic), + util.Set.of(), + util.Map.of(), + util.Map.of(inputTopic + "-store-changelog", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())), + util.Set.of() + )), + Map.empty[String, String].asJava + ) + + val consumer = createStreamsConsumer( + keyDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[K]], + valueDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[V]], + configOverrides = props, + streamsRebalanceData = streamsRebalanceData + ) + consumer.subscribe(util.Set.of(inputTopic), + new StreamsRebalanceListener { + override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] = + Optional.empty() + override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Optional[Exception] = { + Optional.empty() + } + override def onAllTasksLost(): Optional[Exception] = + Optional.empty() + }) + consumer + } + def createAdminClient( listenerName: ListenerName = listenerName, configOverrides: Properties = new Properties diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 44835885e0c..286ac2b098c 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -34,6 +34,7 @@ import org.apache.kafka.clients.HostResolver import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource import org.apache.kafka.clients.admin._ +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} @@ -2573,8 +2574,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val consumerGroupId = "consumer_group_id" val shareGroupId = "share_group_id" val simpleGroupId = "simple_group_id" + val streamsGroupId = "streams_group_id" val testTopicName = "test_topic" + val config = createConfig + client = Admin.create(config) + consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name) val classicGroupConfig = new Properties(consumerConfig) classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId) @@ -2589,8 +2594,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId) val shareGroup = createShareConsumer(configOverrides = shareGroupConfig) - val config = createConfig - client = Admin.create(config) + val streamsGroup = createStreamsGroup( + inputTopic = testTopicName, + streamsGroupId = streamsGroupId + ) + try { client.createTopics(util.Set.of( new NewTopic(testTopicName, 1, 1.toShort) @@ -2604,6 +2612,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { consumerGroup.poll(JDuration.ofMillis(1000)) shareGroup.subscribe(util.Set.of(testTopicName)) shareGroup.poll(JDuration.ofMillis(1000)) + streamsGroup.poll(JDuration.ofMillis(1000)) val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(simpleGroupId, util.Map.of(topicPartition, new OffsetAndMetadata(0L))) @@ -2612,18 +2621,27 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitUntilTrue(() => { val groups = client.listGroups().all().get() - groups.size() == 4 + groups.size() == 5 }, "Expected to find all groups") val classicGroupListing = new GroupListing(classicGroupId, Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)) val consumerGroupListing = new GroupListing(consumerGroupId, Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)) val shareGroupListing = new GroupListing(shareGroupId, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) val simpleGroupListing = new GroupListing(simpleGroupId, Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY)) + // Streams group could either be in STABLE or NOT_READY state + val streamsGroupListingStable = new GroupListing(streamsGroupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)) + val streamsGroupListingNotReady = new GroupListing(streamsGroupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.NOT_READY)) var listGroupsResult = client.listGroups() assertTrue(listGroupsResult.errors().get().isEmpty) - assertEquals(Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.all().get().asScala.toSet) - assertEquals(Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.valid().get().asScala.toSet) + + val expectedStreamListings = Set(streamsGroupListingStable, streamsGroupListingNotReady) + val expectedListings = Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing) + val actualListings = listGroupsResult.all().get().asScala.toSet + + // Check that actualListings contains all expectedListings and one of the streams listings + assertTrue(expectedListings.subsetOf(actualListings)) + assertTrue(actualListings.exists(expectedStreamListings.contains)) listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(util.Set.of(GroupType.CLASSIC))) assertTrue(listGroupsResult.errors().get().isEmpty) @@ -2639,10 +2657,19 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(listGroupsResult.errors().get().isEmpty) assertEquals(Set(shareGroupListing), listGroupsResult.all().get().asScala.toSet) assertEquals(Set(shareGroupListing), listGroupsResult.valid().get().asScala.toSet) + + listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(util.Set.of(GroupType.STREAMS))) + assertTrue(listGroupsResult.errors().get().isEmpty) + assertTrue(listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingStable)) || + listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingNotReady))) + assertTrue(listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingStable)) || + listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingNotReady))) + } finally { Utils.closeQuietly(classicGroup, "classicGroup") Utils.closeQuietly(consumerGroup, "consumerGroup") Utils.closeQuietly(shareGroup, "shareGroup") + Utils.closeQuietly(streamsGroup, "streamsGroup") Utils.closeQuietly(client, "adminClient") } } @@ -4363,6 +4390,137 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } } + + @Test + def testDescribeStreamsGroups(): Unit = { + val streamsGroupId = "stream_group_id" + val testTopicName = "test_topic" + val testNumPartitions = 1 + + val config = createConfig + client = Admin.create(config) + + prepareTopics(List(testTopicName), testNumPartitions) + prepareRecords(testTopicName) + + val streams = createStreamsGroup( + inputTopic = testTopicName, + streamsGroupId = streamsGroupId + ) + streams.poll(JDuration.ofMillis(500L)) + + try { + TestUtils.waitUntilTrue(() => { + val firstGroup = client.listGroups().all().get().stream() + .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null) + firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId + }, "Stream group not stable yet") + + // Verify the describe call works correctly + val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() + val group = describedGroups.get(streamsGroupId) + assertNotNull(group) + assertEquals(streamsGroupId, group.groupId()) + assertFalse(group.members().isEmpty) + assertNotNull(group.subtopologies()) + assertFalse(group.subtopologies().isEmpty) + + // Verify the topology contains the expected source and sink topics + val subtopologies = group.subtopologies().asScala + assertTrue(subtopologies.exists(subtopology => + subtopology.sourceTopics().contains(testTopicName))) + + // Test describing a non-existing group + val nonExistingGroup = "non_existing_stream_group" + val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List.of(nonExistingGroup)) + assertFutureThrows(classOf[GroupIdNotFoundException], describedNonExistingGroupResponse.all()) + + } finally { + Utils.closeQuietly(streams, "streams") + Utils.closeQuietly(client, "adminClient") + } + } + + @Test + def testDeleteStreamsGroups(): Unit = { + val testTopicName = "test_topic" + val testNumPartitions = 3 + val testNumStreamsGroup = 3 + + val targetDeletedGroups = util.List.of("stream_group_id_2", "stream_group_id_3") + val targetRemainingGroups = util.List.of("stream_group_id_1") + + val config = createConfig + client = Admin.create(config) + + prepareTopics(List(testTopicName), testNumPartitions) + prepareRecords(testTopicName) + + val streamsList = scala.collection.mutable.ListBuffer[(String, AsyncKafkaConsumer[_,_])]() + + try { + for (i <- 1 to testNumStreamsGroup) { + val streamsGroupId = s"stream_group_id_$i" + + val streams = createStreamsGroup( + inputTopic = testTopicName, + streamsGroupId = streamsGroupId, + ) + streams.poll(JDuration.ofMillis(500L)) + streamsList += ((streamsGroupId, streams)) + } + + TestUtils.waitUntilTrue(() => { + val groups = client.listGroups().all().get() + groups.stream() + .anyMatch(g => g.groupId().startsWith("stream_group_id_")) && testNumStreamsGroup == groups.size() + }, "Streams groups not ready to delete yet") + + // Test deletion of non-empty existing groups + var deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups) + assertFutureThrows(classOf[GroupNotEmptyException], deleteStreamsGroupResult.all()) + assertEquals(2, deleteStreamsGroupResult.deletedGroups().size()) + + // Stop and clean up the streams for the groups that are going to be deleted + streamsList + .filter { case (groupId, _) => targetDeletedGroups.contains(groupId) } + .foreach { case (_, streams) => + streams.close() + } + + val listTopicResult = client.listTopics() + assertEquals(2, listTopicResult.names().get().size()) + + // Test deletion of emptied existing streams groups + deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups) + assertEquals(2, deleteStreamsGroupResult.deletedGroups().size()) + + // Wait for the deleted groups to be removed + TestUtils.waitUntilTrue(() => { + val groupIds = client.listGroups().all().get().asScala.map(_.groupId()).toSet + targetDeletedGroups.asScala.forall(id => !groupIds.contains(id)) + }, "Deleted groups not yet deleted") + + // Verify that the deleted groups are no longer present + val remainingGroups = client.listGroups().all().get() + assertEquals(targetRemainingGroups.size(), remainingGroups.size()) + remainingGroups.stream().forEach(g => { + assertTrue(targetRemainingGroups.contains(g.groupId())) + }) + + // Test deletion of a non-existing group + val nonExistingGroup = "non_existing_stream_group" + val deleteNonExistingGroupResult = client.deleteStreamsGroups(util.List.of(nonExistingGroup)) + assertFutureThrows(classOf[GroupIdNotFoundException], deleteNonExistingGroupResult.all()) + assertEquals(deleteNonExistingGroupResult.deletedGroups().size(), 1) + + } finally{ + streamsList.foreach { case (_, streams) => + streams.close() + } + Utils.closeQuietly(client, "adminClient") + } + } } object PlaintextAdminIntegrationTest {