This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 0f9ed787548 KAFKA-19413: Extended AuthorizerIntegrationTest to cover 
StreamsGroupDescribe (#19981)
0f9ed787548 is described below

commit 0f9ed78754845f11fdc83dd930a1614f92dfa043
Author: Lucas Brutschy <lbruts...@confluent.io>
AuthorDate: Wed Jun 18 10:19:34 2025 +0200

    KAFKA-19413: Extended AuthorizerIntegrationTest to cover 
StreamsGroupDescribe (#19981)
    
    Extending test coverage of authorization for streams group RPC
    StreamsGroupDescribe. The RPC requires DESCRIBE GROUP and DESCRIBE TOPIC
    permissions for all topics.
    
    Reviewers: Bill Bejeck <bbej...@apache.org>
---
 .../kafka/api/AuthorizerIntegrationTest.scala      | 181 ++++++++++++++++++++-
 .../kafka/api/IntegrationTestHarness.scala         |  35 +++-
 2 files changed, 209 insertions(+), 7 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a869e2eb4ab..424772275ea 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -17,11 +17,12 @@ import java.time.Duration
 import java.util
 import java.util.concurrent.{ExecutionException, Semaphore}
 import java.util.regex.Pattern
-import java.util.{Comparator, Optional, Properties}
+import java.util.{Comparator, Optional, Properties, UUID}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import kafka.utils.TestUtils.waitUntilTrue
 import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, 
ListGroupsOptions, NewTopic}
 import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.consumer.internals.{StreamsRebalanceData, 
StreamsRebalanceListener}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.acl.AclOperation._
 import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
@@ -37,7 +38,7 @@ import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, 
ListOffsetsTopic}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition,
 OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
-import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, 
AlterShareGroupOffsetsRequestData, ConsumerGroupDescribeRequestData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, 
DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, 
DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequ [...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, 
AlterShareGroupOffsetsRequestData, ConsumerGroupDescribeRequestData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, 
DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, 
DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequ [...]
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
SimpleRecord}
@@ -76,6 +77,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   val shareGroupDescribeConfigsAcl = Map(shareGroupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, 
ALLOW)))
   val shareGroupAlterConfigsAcl = Map(shareGroupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW)))
   val streamsGroupReadAcl = Map(streamsGroupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)))
+  val streamsGroupDescribeAcl = Map(streamsGroupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)))
   val clusterAcl = Map(clusterResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CLUSTER_ACTION, 
ALLOW)))
   val clusterCreateAcl = Map(clusterResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW)))
   val clusterAlterAcl = Map(clusterResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER, ALLOW)))
@@ -225,7 +227,9 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
       resp.data.errorCode)),
     ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> ((resp: 
AlterShareGroupOffsetsResponse) => Errors.forCode(
       resp.data.errorCode)),
-    ApiKeys.STREAMS_GROUP_HEARTBEAT -> ((resp: StreamsGroupHeartbeatResponse) 
=> Errors.forCode(resp.data.errorCode))
+    ApiKeys.STREAMS_GROUP_HEARTBEAT -> ((resp: StreamsGroupHeartbeatResponse) 
=> Errors.forCode(resp.data.errorCode)),
+    ApiKeys.STREAMS_GROUP_DESCRIBE -> ((resp: StreamsGroupDescribeResponse) =>
+      Errors.forCode(resp.data.groups.asScala.find(g => streamsGroup == 
g.groupId).head.errorCode))
   )
 
   def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = {
@@ -294,7 +298,8 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> (shareGroupDescribeAcl ++ 
topicDescribeAcl),
     ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> (shareGroupDeleteAcl ++ 
topicReadAcl),
     ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> (shareGroupReadAcl ++ topicReadAcl),
-    ApiKeys.STREAMS_GROUP_HEARTBEAT -> (streamsGroupReadAcl ++ 
topicDescribeAcl)
+    ApiKeys.STREAMS_GROUP_HEARTBEAT -> (streamsGroupReadAcl ++ 
topicDescribeAcl),
+    ApiKeys.STREAMS_GROUP_DESCRIBE -> (streamsGroupDescribeAcl ++ 
topicDescribeAcl),
   )
 
   private def createMetadataRequest(allowAutoTopicCreation: Boolean) = {
@@ -870,6 +875,11 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
         ).asJava
       ))).build(ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion)
 
+  private def streamsGroupDescribeRequest = new 
StreamsGroupDescribeRequest.Builder(
+    new StreamsGroupDescribeRequestData()
+      .setGroupIds(List(streamsGroup).asJava)
+      
.setIncludeAuthorizedOperations(false)).build(ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion)
+  
   private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, 
AbstractRequest], topicExists: Boolean = true,
                            topicNames: Map[Uuid, String] = getTopicNames()) = {
     for ((key, request) <- requestKeyToRequest) {
@@ -954,6 +964,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
       ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> deleteShareGroupOffsetsRequest,
       ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> alterShareGroupOffsetsRequest,
       ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest,
+      ApiKeys.STREAMS_GROUP_DESCRIBE -> streamsGroupDescribeRequest,
 
       // Delete the topic last
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
@@ -987,7 +998,8 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
       ApiKeys.SHARE_FETCH -> createShareFetchRequest,
       ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest,
       ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest,
-      ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest
+      ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest,
+      ApiKeys.STREAMS_GROUP_DESCRIBE -> streamsGroupDescribeRequest
     )
 
     sendRequests(requestKeyToRequest, topicExists = false, topicNames)
@@ -3853,6 +3865,165 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
       response.data().status())
   }
 
+  private def createStreamsGroupToDescribe(
+                                            topicAsSourceTopic: Boolean,
+                                            topicAsRepartitionSinkTopic: 
Boolean,
+                                            topicAsRepartitionSourceTopic: 
Boolean,
+                                            topicAsStateChangelogTopics: 
Boolean
+                                          ): Unit = {
+    createTopicWithBrokerPrincipal(sourceTopic)
+    createTopicWithBrokerPrincipal(topic)
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)), streamsGroupResource)
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)), topicResource)
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)), sourceTopicResource)
+    streamsConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroup)
+    streamsConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
+    val consumer = createStreamsConsumer(streamsRebalanceData = new 
StreamsRebalanceData(
+      UUID.randomUUID(),
+      Optional.empty(),
+      util.Map.of(
+        "subtopology-0", new StreamsRebalanceData.Subtopology(
+          if (topicAsSourceTopic) util.Set.of(sourceTopic, topic) else 
util.Set.of(sourceTopic),
+          if (topicAsRepartitionSinkTopic) util.Set.of(topic) else 
util.Set.of(),
+          if (topicAsRepartitionSourceTopic)
+            util.Map.of(topic, new 
StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of()))
+          else util.Map.of(),
+          if (topicAsStateChangelogTopics)
+            util.Map.of(topic, new 
StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of()))
+          else util.Map.of(),
+          util.Set.of()
+        )),
+      Map.empty[String, String].asJava
+    ))
+    consumer.subscribe(
+      if (topicAsSourceTopic || topicAsRepartitionSourceTopic) 
util.Set.of(sourceTopic, topic) else util.Set.of(sourceTopic),
+      new StreamsRebalanceListener {
+        override def onTasksRevoked(tasks: 
util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
+          Optional.empty()
+
+        override def onTasksAssigned(assignment: 
StreamsRebalanceData.Assignment): Optional[Exception] =
+          Optional.empty()
+
+        override def onAllTasksLost(): Optional[Exception] =
+          Optional.empty()
+      }
+    )
+    consumer.poll(Duration.ofMillis(500L))
+    removeAllClientAcls()
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array(
+    "true,  false, false, false",
+    "false, true,  false, false",
+    "false, false, true,  false",
+    "false, false, false, true"
+  ))
+  def testStreamsGroupDescribeWithGroupDescribeAndTopicDescribeAcl(
+                                                                    
topicAsSourceTopic: Boolean,
+                                                                    
topicAsRepartitionSinkTopic: Boolean,
+                                                                    
topicAsRepartitionSourceTopic: Boolean,
+                                                                    
topicAsStateChangelogTopics: Boolean
+                                                                  ): Unit = {
+    createStreamsGroupToDescribe(
+      topicAsSourceTopic,
+      topicAsRepartitionSinkTopic,
+      topicAsRepartitionSourceTopic,
+      topicAsStateChangelogTopics
+    )
+    addAndVerifyAcls(streamsGroupDescribeAcl(streamsGroupResource), 
streamsGroupResource)
+    addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), 
sourceTopicResource) // Always added, since we need a source topic
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+    val request = streamsGroupDescribeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array(
+    "true,  false, false, false",
+    "false, true,  false, false",
+    "false, false, true,  false",
+    "false, false, false, true"
+  ))
+  def testStreamsGroupDescribeWithOperationAll(
+                                                topicAsSourceTopic: Boolean,
+                                                topicAsRepartitionSinkTopic: 
Boolean,
+                                                topicAsRepartitionSourceTopic: 
Boolean,
+                                                topicAsStateChangelogTopics: 
Boolean
+                                              ): Unit = {
+    createStreamsGroupToDescribe(
+      topicAsSourceTopic,
+      topicAsRepartitionSinkTopic,
+      topicAsRepartitionSourceTopic,
+      topicAsStateChangelogTopics
+    )
+
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), streamsGroupResource)
+    addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), 
sourceTopicResource) // Always added, since we need a source topic
+    addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+    val request = streamsGroupDescribeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array(
+    "true,  false, false, false",
+    "false, true,  false, false",
+    "false, false, true,  false",
+    "false, false, false, true"
+  ))
+  def testStreamsGroupDescribeWithoutGroupDescribeAcl(
+                                                       topicAsSourceTopic: 
Boolean,
+                                                       
topicAsRepartitionSinkTopic: Boolean,
+                                                       
topicAsRepartitionSourceTopic: Boolean,
+                                                       
topicAsStateChangelogTopics: Boolean
+                                                     ): Unit = {
+    createStreamsGroupToDescribe(
+      topicAsSourceTopic,
+      topicAsRepartitionSinkTopic,
+      topicAsRepartitionSourceTopic,
+      topicAsStateChangelogTopics
+    )
+    addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), 
sourceTopicResource) // Always added, since we need a source topic
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+    val request = streamsGroupDescribeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array(
+    "true,  false, false, false",
+    "false, true,  false, false",
+    "false, false, true,  false",
+    "false, false, false, true"
+  ))
+  def testStreamsGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl(
+                                                                      
topicAsSourceTopic: Boolean,
+                                                                      
topicAsRepartitionSinkTopic: Boolean,
+                                                                      
topicAsRepartitionSourceTopic: Boolean,
+                                                                      
topicAsStateChangelogTopics: Boolean
+                                                                    ): Unit = {
+    createStreamsGroupToDescribe(
+      topicAsSourceTopic,
+      topicAsRepartitionSinkTopic,
+      topicAsRepartitionSourceTopic,
+      topicAsStateChangelogTopics
+    )
+
+    val request = streamsGroupDescribeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), 
sourceTopicResource) // Always added, since we need a source topic
+
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+  
   private def sendAndReceiveFirstRegexHeartbeat(memberId: String,
                                                 listenerName: ListenerName): 
ConsumerGroupHeartbeatResponseData = {
     val request = new ConsumerGroupHeartbeatRequest.Builder(
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index e062dcc09fa..7c08dd9c3fe 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -22,14 +22,16 @@ import org.apache.kafka.clients.consumer.{Consumer, 
ConsumerConfig, KafkaConsume
 import kafka.utils.TestUtils
 import kafka.utils.Implicits._
 
-import java.util.Properties
+import java.util.{Optional, Properties}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
 import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
 import kafka.security.JaasTestUtils
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
+import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, 
StreamsRebalanceData}
 import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer, Deserializer, Serializer}
+import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.MetadataLogConfig
 import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs}
@@ -49,6 +51,7 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
   val producerConfig = new Properties
   val consumerConfig = new Properties
   val shareConsumerConfig = new Properties
+  val streamsConsumerConfig = new Properties
   val adminClientConfig = new Properties
   val superuserClientConfig = new Properties
   val serverConfig = new Properties
@@ -56,6 +59,7 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
 
   private val consumers = mutable.Buffer[Consumer[_, _]]()
   private val shareConsumers = mutable.Buffer[ShareConsumer[_, _]]()
+  private val streamsConsumers = mutable.Buffer[Consumer[_, _]]()
   private val producers = mutable.Buffer[KafkaProducer[_, _]]()
   private val adminClients = mutable.Buffer[Admin]()
 
@@ -148,7 +152,12 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
     shareConsumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group")
     
shareConsumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[ByteArrayDeserializer].getName)
     
shareConsumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[ByteArrayDeserializer].getName)
-
+    
+    streamsConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers())
+    streamsConsumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group")
+    
streamsConsumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[ByteArrayDeserializer].getName)
+    
streamsConsumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 classOf[ByteArrayDeserializer].getName)
+    
     adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers())
 
     doSuperuserSetup(testInfo)
@@ -207,6 +216,25 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
     shareConsumer
   }
 
+  def createStreamsConsumer[K, V](keyDeserializer: Deserializer[K] = new 
ByteArrayDeserializer,
+                                valueDeserializer: Deserializer[V] = new 
ByteArrayDeserializer,
+                                configOverrides: Properties = new Properties,
+                                configsToRemove: List[String] = List(),
+                                streamsRebalanceData: StreamsRebalanceData): 
AsyncKafkaConsumer[K, V] = {
+    val props = new Properties
+    props ++= streamsConsumerConfig
+    props ++= configOverrides
+    configsToRemove.foreach(props.remove(_))
+    val streamsConsumer = new AsyncKafkaConsumer[K, V](
+      new 
ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(Utils.propsToMap(props),
 keyDeserializer, valueDeserializer)),
+      keyDeserializer,
+      valueDeserializer,
+      Optional.of(streamsRebalanceData)
+    )
+    streamsConsumers += streamsConsumer
+    streamsConsumer
+  }
+
   def createAdminClient(
     listenerName: ListenerName = listenerName,
     configOverrides: Properties = new Properties
@@ -239,11 +267,14 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
       consumers.foreach(_.close(Duration.ZERO))
       shareConsumers.foreach(_.wakeup())
       shareConsumers.foreach(_.close(Duration.ZERO))
+      streamsConsumers.foreach(_.wakeup())
+      streamsConsumers.foreach(_.close(Duration.ZERO))
       adminClients.foreach(_.close(Duration.ZERO))
 
       producers.clear()
       consumers.clear()
       shareConsumers.clear()
+      streamsConsumers.clear()
       adminClients.clear()
     } finally {
       super.tearDown()

Reply via email to