This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch 4.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push: new 0f9ed787548 KAFKA-19413: Extended AuthorizerIntegrationTest to cover StreamsGroupDescribe (#19981) 0f9ed787548 is described below commit 0f9ed78754845f11fdc83dd930a1614f92dfa043 Author: Lucas Brutschy <lbruts...@confluent.io> AuthorDate: Wed Jun 18 10:19:34 2025 +0200 KAFKA-19413: Extended AuthorizerIntegrationTest to cover StreamsGroupDescribe (#19981) Extending test coverage of authorization for streams group RPC StreamsGroupDescribe. The RPC requires DESCRIBE GROUP and DESCRIBE TOPIC permissions for all topics. Reviewers: Bill Bejeck <bbej...@apache.org> --- .../kafka/api/AuthorizerIntegrationTest.scala | 181 ++++++++++++++++++++- .../kafka/api/IntegrationTestHarness.scala | 35 +++- 2 files changed, 209 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index a869e2eb4ab..424772275ea 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -17,11 +17,12 @@ import java.time.Duration import java.util import java.util.concurrent.{ExecutionException, Semaphore} import java.util.regex.Pattern -import java.util.{Comparator, Optional, Properties} +import java.util.{Comparator, Optional, Properties, UUID} import kafka.utils.{TestInfoUtils, TestUtils} import kafka.utils.TestUtils.waitUntilTrue import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ListGroupsOptions, NewTopic} import org.apache.kafka.clients.consumer._ +import org.apache.kafka.clients.consumer.internals.{StreamsRebalanceData, StreamsRebalanceListener} import org.apache.kafka.clients.producer._ import org.apache.kafka.common.acl.AclOperation._ import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} @@ -37,7 +38,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection} -import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, AlterShareGroupOffsetsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequ [...] +import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, AlterShareGroupOffsetsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequ [...] import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord} @@ -76,6 +77,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { val shareGroupDescribeConfigsAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, ALLOW))) val shareGroupAlterConfigsAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW))) val streamsGroupReadAcl = Map(streamsGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW))) + val streamsGroupDescribeAcl = Map(streamsGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW))) val clusterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CLUSTER_ACTION, ALLOW))) val clusterCreateAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW))) val clusterAlterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER, ALLOW))) @@ -225,7 +227,9 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { resp.data.errorCode)), ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> ((resp: AlterShareGroupOffsetsResponse) => Errors.forCode( resp.data.errorCode)), - ApiKeys.STREAMS_GROUP_HEARTBEAT -> ((resp: StreamsGroupHeartbeatResponse) => Errors.forCode(resp.data.errorCode)) + ApiKeys.STREAMS_GROUP_HEARTBEAT -> ((resp: StreamsGroupHeartbeatResponse) => Errors.forCode(resp.data.errorCode)), + ApiKeys.STREAMS_GROUP_DESCRIBE -> ((resp: StreamsGroupDescribeResponse) => + Errors.forCode(resp.data.groups.asScala.find(g => streamsGroup == g.groupId).head.errorCode)) ) def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = { @@ -294,7 +298,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> (shareGroupDescribeAcl ++ topicDescribeAcl), ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> (shareGroupDeleteAcl ++ topicReadAcl), ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> (shareGroupReadAcl ++ topicReadAcl), - ApiKeys.STREAMS_GROUP_HEARTBEAT -> (streamsGroupReadAcl ++ topicDescribeAcl) + ApiKeys.STREAMS_GROUP_HEARTBEAT -> (streamsGroupReadAcl ++ topicDescribeAcl), + ApiKeys.STREAMS_GROUP_DESCRIBE -> (streamsGroupDescribeAcl ++ topicDescribeAcl), ) private def createMetadataRequest(allowAutoTopicCreation: Boolean) = { @@ -870,6 +875,11 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { ).asJava ))).build(ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion) + private def streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder( + new StreamsGroupDescribeRequestData() + .setGroupIds(List(streamsGroup).asJava) + .setIncludeAuthorizedOperations(false)).build(ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion) + private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, AbstractRequest], topicExists: Boolean = true, topicNames: Map[Uuid, String] = getTopicNames()) = { for ((key, request) <- requestKeyToRequest) { @@ -954,6 +964,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> deleteShareGroupOffsetsRequest, ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> alterShareGroupOffsetsRequest, ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest, + ApiKeys.STREAMS_GROUP_DESCRIBE -> streamsGroupDescribeRequest, // Delete the topic last ApiKeys.DELETE_TOPICS -> deleteTopicsRequest @@ -987,7 +998,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { ApiKeys.SHARE_FETCH -> createShareFetchRequest, ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest, ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest, - ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest + ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest, + ApiKeys.STREAMS_GROUP_DESCRIBE -> streamsGroupDescribeRequest ) sendRequests(requestKeyToRequest, topicExists = false, topicNames) @@ -3853,6 +3865,165 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { response.data().status()) } + private def createStreamsGroupToDescribe( + topicAsSourceTopic: Boolean, + topicAsRepartitionSinkTopic: Boolean, + topicAsRepartitionSourceTopic: Boolean, + topicAsStateChangelogTopics: Boolean + ): Unit = { + createTopicWithBrokerPrincipal(sourceTopic) + createTopicWithBrokerPrincipal(topic) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), streamsGroupResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), sourceTopicResource) + streamsConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroup) + streamsConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + val consumer = createStreamsConsumer(streamsRebalanceData = new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + util.Map.of( + "subtopology-0", new StreamsRebalanceData.Subtopology( + if (topicAsSourceTopic) util.Set.of(sourceTopic, topic) else util.Set.of(sourceTopic), + if (topicAsRepartitionSinkTopic) util.Set.of(topic) else util.Set.of(), + if (topicAsRepartitionSourceTopic) + util.Map.of(topic, new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())) + else util.Map.of(), + if (topicAsStateChangelogTopics) + util.Map.of(topic, new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())) + else util.Map.of(), + util.Set.of() + )), + Map.empty[String, String].asJava + )) + consumer.subscribe( + if (topicAsSourceTopic || topicAsRepartitionSourceTopic) util.Set.of(sourceTopic, topic) else util.Set.of(sourceTopic), + new StreamsRebalanceListener { + override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] = + Optional.empty() + + override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Optional[Exception] = + Optional.empty() + + override def onAllTasksLost(): Optional[Exception] = + Optional.empty() + } + ) + consumer.poll(Duration.ofMillis(500L)) + removeAllClientAcls() + } + + @ParameterizedTest + @CsvSource(Array( + "true, false, false, false", + "false, true, false, false", + "false, false, true, false", + "false, false, false, true" + )) + def testStreamsGroupDescribeWithGroupDescribeAndTopicDescribeAcl( + topicAsSourceTopic: Boolean, + topicAsRepartitionSinkTopic: Boolean, + topicAsRepartitionSourceTopic: Boolean, + topicAsStateChangelogTopics: Boolean + ): Unit = { + createStreamsGroupToDescribe( + topicAsSourceTopic, + topicAsRepartitionSinkTopic, + topicAsRepartitionSourceTopic, + topicAsStateChangelogTopics + ) + addAndVerifyAcls(streamsGroupDescribeAcl(streamsGroupResource), streamsGroupResource) + addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), sourceTopicResource) // Always added, since we need a source topic + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = streamsGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @CsvSource(Array( + "true, false, false, false", + "false, true, false, false", + "false, false, true, false", + "false, false, false, true" + )) + def testStreamsGroupDescribeWithOperationAll( + topicAsSourceTopic: Boolean, + topicAsRepartitionSinkTopic: Boolean, + topicAsRepartitionSourceTopic: Boolean, + topicAsStateChangelogTopics: Boolean + ): Unit = { + createStreamsGroupToDescribe( + topicAsSourceTopic, + topicAsRepartitionSinkTopic, + topicAsRepartitionSourceTopic, + topicAsStateChangelogTopics + ) + + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), streamsGroupResource) + addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), sourceTopicResource) // Always added, since we need a source topic + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = streamsGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @CsvSource(Array( + "true, false, false, false", + "false, true, false, false", + "false, false, true, false", + "false, false, false, true" + )) + def testStreamsGroupDescribeWithoutGroupDescribeAcl( + topicAsSourceTopic: Boolean, + topicAsRepartitionSinkTopic: Boolean, + topicAsRepartitionSourceTopic: Boolean, + topicAsStateChangelogTopics: Boolean + ): Unit = { + createStreamsGroupToDescribe( + topicAsSourceTopic, + topicAsRepartitionSinkTopic, + topicAsRepartitionSourceTopic, + topicAsStateChangelogTopics + ) + addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), sourceTopicResource) // Always added, since we need a source topic + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = streamsGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @CsvSource(Array( + "true, false, false, false", + "false, true, false, false", + "false, false, true, false", + "false, false, false, true" + )) + def testStreamsGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl( + topicAsSourceTopic: Boolean, + topicAsRepartitionSinkTopic: Boolean, + topicAsRepartitionSourceTopic: Boolean, + topicAsStateChangelogTopics: Boolean + ): Unit = { + createStreamsGroupToDescribe( + topicAsSourceTopic, + topicAsRepartitionSinkTopic, + topicAsRepartitionSourceTopic, + topicAsStateChangelogTopics + ) + + val request = streamsGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), sourceTopicResource) // Always added, since we need a source topic + + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + private def sendAndReceiveFirstRegexHeartbeat(memberId: String, listenerName: ListenerName): ConsumerGroupHeartbeatResponseData = { val request = new ConsumerGroupHeartbeatRequest.Builder( diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index e062dcc09fa..7c08dd9c3fe 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -22,14 +22,16 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsume import kafka.utils.TestUtils import kafka.utils.Implicits._ -import java.util.Properties +import java.util.{Optional, Properties} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} import kafka.server.KafkaConfig import kafka.integration.KafkaServerTestHarness import kafka.security.JaasTestUtils import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} +import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData} import org.apache.kafka.common.network.{ConnectionMode, ListenerName} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer} +import org.apache.kafka.common.utils.Utils import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.MetadataLogConfig import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs} @@ -49,6 +51,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { val producerConfig = new Properties val consumerConfig = new Properties val shareConsumerConfig = new Properties + val streamsConsumerConfig = new Properties val adminClientConfig = new Properties val superuserClientConfig = new Properties val serverConfig = new Properties @@ -56,6 +59,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { private val consumers = mutable.Buffer[Consumer[_, _]]() private val shareConsumers = mutable.Buffer[ShareConsumer[_, _]]() + private val streamsConsumers = mutable.Buffer[Consumer[_, _]]() private val producers = mutable.Buffer[KafkaProducer[_, _]]() private val adminClients = mutable.Buffer[Admin]() @@ -148,7 +152,12 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { shareConsumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group") shareConsumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) shareConsumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) - + + streamsConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + streamsConsumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group") + streamsConsumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) + streamsConsumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) + adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) doSuperuserSetup(testInfo) @@ -207,6 +216,25 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { shareConsumer } + def createStreamsConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer, + valueDeserializer: Deserializer[V] = new ByteArrayDeserializer, + configOverrides: Properties = new Properties, + configsToRemove: List[String] = List(), + streamsRebalanceData: StreamsRebalanceData): AsyncKafkaConsumer[K, V] = { + val props = new Properties + props ++= streamsConsumerConfig + props ++= configOverrides + configsToRemove.foreach(props.remove(_)) + val streamsConsumer = new AsyncKafkaConsumer[K, V]( + new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(Utils.propsToMap(props), keyDeserializer, valueDeserializer)), + keyDeserializer, + valueDeserializer, + Optional.of(streamsRebalanceData) + ) + streamsConsumers += streamsConsumer + streamsConsumer + } + def createAdminClient( listenerName: ListenerName = listenerName, configOverrides: Properties = new Properties @@ -239,11 +267,14 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { consumers.foreach(_.close(Duration.ZERO)) shareConsumers.foreach(_.wakeup()) shareConsumers.foreach(_.close(Duration.ZERO)) + streamsConsumers.foreach(_.wakeup()) + streamsConsumers.foreach(_.close(Duration.ZERO)) adminClients.foreach(_.close(Duration.ZERO)) producers.clear() consumers.clear() shareConsumers.clear() + streamsConsumers.clear() adminClients.clear() } finally { super.tearDown()