This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a81f08d368a KAFKA-19550: Integration test for Streams-related Admin 
APIs [1/N] (#20244)
a81f08d368a is described below

commit a81f08d368afccd681fc44010c57d5395669d60c
Author: lucliu1108 <luc...@confluent.io>
AuthorDate: Thu Sep 4 08:09:21 2025 -0500

    KAFKA-19550: Integration test for Streams-related Admin APIs [1/N] (#20244)
    
    This change adds:
    
    - Integration test for `Admin#describeStreamsGroups` API
    - Integration test for `Admin#deleteStreamsGroup` API
    
    Reviewers: Alieh Saeedi <asae...@confluent.io>, Lucas Brutschy
     <lucas...@apache.org>
    
    ---------
    
    Co-authored-by: Lucas Brutschy <lbruts...@gmail.com>
---
 .../kafka/api/IntegrationTestHarness.scala         |  52 ++++++-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 168 ++++++++++++++++++++-
 2 files changed, 213 insertions(+), 7 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 7c08dd9c3fe..fe24e45f16b 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -22,13 +22,14 @@ import org.apache.kafka.clients.consumer.{Consumer, 
ConsumerConfig, KafkaConsume
 import kafka.utils.TestUtils
 import kafka.utils.Implicits._
 
-import java.util.{Optional, Properties}
+import java.util
+import java.util.{Optional, Properties, UUID}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
 import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
 import kafka.security.JaasTestUtils
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
-import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, 
StreamsRebalanceData}
+import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, 
StreamsRebalanceData, StreamsRebalanceListener}
 import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer, Deserializer, Serializer}
 import org.apache.kafka.common.utils.Utils
@@ -39,6 +40,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 
 import scala.collection.mutable
 import scala.collection.Seq
+import scala.jdk.CollectionConverters._
 import scala.jdk.javaapi.OptionConverters
 
 /**
@@ -235,6 +237,52 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
     streamsConsumer
   }
 
+  def createStreamsGroup[K, V](configOverrides: Properties = new Properties,
+                               configsToRemove: List[String] = List(),
+                               inputTopic: String,
+                               streamsGroupId: String): AsyncKafkaConsumer[K, 
V] = {
+    val props = new Properties()
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId)
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[ByteArrayDeserializer].getName)
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[ByteArrayDeserializer].getName)
+    props ++= configOverrides
+    configsToRemove.foreach(props.remove(_))
+
+    val streamsRebalanceData = new StreamsRebalanceData(
+      UUID.randomUUID(),
+      Optional.empty(),
+      util.Map.of(
+        "subtopology-0", new StreamsRebalanceData.Subtopology(
+          util.Set.of(inputTopic),
+          util.Set.of(),
+          util.Map.of(),
+          util.Map.of(inputTopic + "-store-changelog", new 
StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), 
util.Map.of())),
+          util.Set.of()
+        )),
+      Map.empty[String, String].asJava
+    )
+
+    val consumer = createStreamsConsumer(
+      keyDeserializer = new 
ByteArrayDeserializer().asInstanceOf[Deserializer[K]],
+      valueDeserializer = new 
ByteArrayDeserializer().asInstanceOf[Deserializer[V]],
+      configOverrides = props,
+      streamsRebalanceData = streamsRebalanceData
+    )
+    consumer.subscribe(util.Set.of(inputTopic),
+      new StreamsRebalanceListener {
+        override def onTasksRevoked(tasks: 
util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
+          Optional.empty()
+        override def onTasksAssigned(assignment: 
StreamsRebalanceData.Assignment): Optional[Exception] = {
+          Optional.empty()
+        }
+        override def onAllTasksLost(): Optional[Exception] =
+          Optional.empty()
+      })
+    consumer
+  }
+
   def createAdminClient(
     listenerName: ListenerName = listenerName,
     configOverrides: Properties = new Properties
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 44835885e0c..286ac2b098c 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.clients.HostResolver
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
 import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer
 import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, 
ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, 
AclBindingFilter, AclOperation, AclPermissionType}
@@ -2573,8 +2574,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val consumerGroupId = "consumer_group_id"
     val shareGroupId = "share_group_id"
     val simpleGroupId = "simple_group_id"
+    val streamsGroupId = "streams_group_id"
     val testTopicName = "test_topic"
 
+    val config = createConfig
+    client = Admin.create(config)
+
     consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name)
     val classicGroupConfig = new Properties(consumerConfig)
     classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId)
@@ -2589,8 +2594,11 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId)
     val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
 
-    val config = createConfig
-    client = Admin.create(config)
+    val streamsGroup = createStreamsGroup(
+      inputTopic = testTopicName,
+      streamsGroupId = streamsGroupId
+    )
+
     try {
       client.createTopics(util.Set.of(
         new NewTopic(testTopicName, 1, 1.toShort)
@@ -2604,6 +2612,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       consumerGroup.poll(JDuration.ofMillis(1000))
       shareGroup.subscribe(util.Set.of(testTopicName))
       shareGroup.poll(JDuration.ofMillis(1000))
+      streamsGroup.poll(JDuration.ofMillis(1000))
 
       val alterConsumerGroupOffsetsResult = 
client.alterConsumerGroupOffsets(simpleGroupId,
         util.Map.of(topicPartition, new OffsetAndMetadata(0L)))
@@ -2612,18 +2621,27 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
       TestUtils.waitUntilTrue(() => {
         val groups = client.listGroups().all().get()
-        groups.size() == 4
+        groups.size() == 5
       }, "Expected to find all groups")
 
       val classicGroupListing = new GroupListing(classicGroupId, 
Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE))
       val consumerGroupListing = new GroupListing(consumerGroupId, 
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE))
       val shareGroupListing = new GroupListing(shareGroupId, 
Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
       val simpleGroupListing = new GroupListing(simpleGroupId, 
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY))
+      // Streams group could either be in STABLE or NOT_READY state
+      val streamsGroupListingStable = new GroupListing(streamsGroupId, 
Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE))
+      val streamsGroupListingNotReady = new GroupListing(streamsGroupId, 
Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.NOT_READY))
 
       var listGroupsResult = client.listGroups()
       assertTrue(listGroupsResult.errors().get().isEmpty)
-      assertEquals(Set(classicGroupListing, simpleGroupListing, 
consumerGroupListing, shareGroupListing), 
listGroupsResult.all().get().asScala.toSet)
-      assertEquals(Set(classicGroupListing, simpleGroupListing, 
consumerGroupListing, shareGroupListing), 
listGroupsResult.valid().get().asScala.toSet)
+
+      val expectedStreamListings = Set(streamsGroupListingStable, 
streamsGroupListingNotReady)
+      val expectedListings = Set(classicGroupListing, simpleGroupListing, 
consumerGroupListing, shareGroupListing)
+      val actualListings = listGroupsResult.all().get().asScala.toSet
+
+      // Check that actualListings contains all expectedListings and one of 
the streams listings
+      assertTrue(expectedListings.subsetOf(actualListings))
+      assertTrue(actualListings.exists(expectedStreamListings.contains))
 
       listGroupsResult = client.listGroups(new 
ListGroupsOptions().withTypes(util.Set.of(GroupType.CLASSIC)))
       assertTrue(listGroupsResult.errors().get().isEmpty)
@@ -2639,10 +2657,19 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       assertTrue(listGroupsResult.errors().get().isEmpty)
       assertEquals(Set(shareGroupListing), 
listGroupsResult.all().get().asScala.toSet)
       assertEquals(Set(shareGroupListing), 
listGroupsResult.valid().get().asScala.toSet)
+
+      listGroupsResult = client.listGroups(new 
ListGroupsOptions().withTypes(util.Set.of(GroupType.STREAMS)))
+      assertTrue(listGroupsResult.errors().get().isEmpty)
+      
assertTrue(listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingStable))
 ||
+        
listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingNotReady)))
+      
assertTrue(listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingStable))
 ||
+        
listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingNotReady)))
+
     } finally {
       Utils.closeQuietly(classicGroup, "classicGroup")
       Utils.closeQuietly(consumerGroup, "consumerGroup")
       Utils.closeQuietly(shareGroup, "shareGroup")
+      Utils.closeQuietly(streamsGroup, "streamsGroup")
       Utils.closeQuietly(client, "adminClient")
     }
   }
@@ -4363,6 +4390,137 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       }
     }
   }
+
+  @Test
+  def testDescribeStreamsGroups(): Unit = {
+    val streamsGroupId = "stream_group_id"
+    val testTopicName = "test_topic"
+    val testNumPartitions = 1
+
+    val config = createConfig
+    client = Admin.create(config)
+
+    prepareTopics(List(testTopicName), testNumPartitions)
+    prepareRecords(testTopicName)
+
+    val streams = createStreamsGroup(
+      inputTopic = testTopicName,
+      streamsGroupId = streamsGroupId
+    )
+    streams.poll(JDuration.ofMillis(500L))
+
+    try {
+      TestUtils.waitUntilTrue(() => {
+        val firstGroup = client.listGroups().all().get().stream()
+          .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
+        firstGroup.groupState().orElse(null) == GroupState.STABLE && 
firstGroup.groupId() == streamsGroupId
+      }, "Stream group not stable yet")
+
+      // Verify the describe call works correctly
+      val describedGroups = 
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
+      val group = describedGroups.get(streamsGroupId)
+      assertNotNull(group)
+      assertEquals(streamsGroupId, group.groupId())
+      assertFalse(group.members().isEmpty)
+      assertNotNull(group.subtopologies())
+      assertFalse(group.subtopologies().isEmpty)
+
+      // Verify the topology contains the expected source and sink topics
+      val subtopologies = group.subtopologies().asScala
+      assertTrue(subtopologies.exists(subtopology =>
+        subtopology.sourceTopics().contains(testTopicName)))
+
+      // Test describing a non-existing group
+      val nonExistingGroup = "non_existing_stream_group"
+      val describedNonExistingGroupResponse = 
client.describeStreamsGroups(util.List.of(nonExistingGroup))
+      assertFutureThrows(classOf[GroupIdNotFoundException], 
describedNonExistingGroupResponse.all())
+
+    } finally {
+      Utils.closeQuietly(streams, "streams")
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
+
+  @Test
+  def testDeleteStreamsGroups(): Unit = {
+    val testTopicName = "test_topic"
+    val testNumPartitions = 3
+    val testNumStreamsGroup = 3
+
+    val targetDeletedGroups = util.List.of("stream_group_id_2", 
"stream_group_id_3")
+    val targetRemainingGroups = util.List.of("stream_group_id_1")
+
+    val config = createConfig
+    client = Admin.create(config)
+
+    prepareTopics(List(testTopicName), testNumPartitions)
+    prepareRecords(testTopicName)
+
+    val streamsList = scala.collection.mutable.ListBuffer[(String, 
AsyncKafkaConsumer[_,_])]()
+
+    try {
+      for (i <- 1 to testNumStreamsGroup) {
+        val streamsGroupId = s"stream_group_id_$i"
+
+        val streams = createStreamsGroup(
+          inputTopic = testTopicName,
+          streamsGroupId = streamsGroupId,
+        )
+        streams.poll(JDuration.ofMillis(500L))
+        streamsList += ((streamsGroupId, streams))
+      }
+
+      TestUtils.waitUntilTrue(() => {
+        val groups = client.listGroups().all().get()
+        groups.stream()
+          .anyMatch(g => g.groupId().startsWith("stream_group_id_")) &&  
testNumStreamsGroup == groups.size()
+      }, "Streams groups not ready to delete yet")
+
+      // Test deletion of non-empty existing groups
+      var deleteStreamsGroupResult = 
client.deleteStreamsGroups(targetDeletedGroups)
+      assertFutureThrows(classOf[GroupNotEmptyException], 
deleteStreamsGroupResult.all())
+      assertEquals(2, deleteStreamsGroupResult.deletedGroups().size())
+
+      // Stop and clean up the streams for the groups that are going to be 
deleted
+      streamsList
+        .filter { case (groupId, _) => targetDeletedGroups.contains(groupId) }
+        .foreach { case (_, streams) =>
+          streams.close()
+        }
+
+      val listTopicResult = client.listTopics()
+      assertEquals(2, listTopicResult.names().get().size())
+
+      // Test deletion of emptied existing streams groups
+      deleteStreamsGroupResult = 
client.deleteStreamsGroups(targetDeletedGroups)
+      assertEquals(2, deleteStreamsGroupResult.deletedGroups().size())
+
+      // Wait for the deleted groups to be removed
+      TestUtils.waitUntilTrue(() => {
+        val groupIds = 
client.listGroups().all().get().asScala.map(_.groupId()).toSet
+        targetDeletedGroups.asScala.forall(id => !groupIds.contains(id))
+      }, "Deleted groups not yet deleted")
+
+      // Verify that the deleted groups are no longer present
+      val remainingGroups = client.listGroups().all().get()
+      assertEquals(targetRemainingGroups.size(), remainingGroups.size())
+      remainingGroups.stream().forEach(g => {
+        assertTrue(targetRemainingGroups.contains(g.groupId()))
+      })
+
+      // Test deletion of a non-existing group
+      val nonExistingGroup = "non_existing_stream_group"
+      val deleteNonExistingGroupResult = 
client.deleteStreamsGroups(util.List.of(nonExistingGroup))
+      assertFutureThrows(classOf[GroupIdNotFoundException], 
deleteNonExistingGroupResult.all())
+      assertEquals(deleteNonExistingGroupResult.deletedGroups().size(), 1)
+
+    } finally{
+      streamsList.foreach { case (_, streams) =>
+        streams.close()
+      }
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
 }
 
 object PlaintextAdminIntegrationTest {

Reply via email to