This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e3c456ff0fe KAFKA-19169: Enhance AuthorizerIntegrationTest for share group APIs (#19540) e3c456ff0fe is described below commit e3c456ff0fe96827e55207a3326d68303563cd0d Author: Lan Ding <53332773+dl1...@users.noreply.github.com> AuthorDate: Thu May 1 17:13:43 2025 +0800 KAFKA-19169: Enhance AuthorizerIntegrationTest for share group APIs (#19540) Enhance AuthorizerIntegrationTest for share group APIs Reviewers: Andrew Schofield <aschofi...@confluent.io> --- .../api/AbstractAuthorizerIntegrationTest.scala | 2 + .../kafka/api/AuthorizerIntegrationTest.scala | 648 ++++++++++++++++++++- 2 files changed, 644 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala index dc836352787..54f6d71a278 100644 --- a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala @@ -80,11 +80,13 @@ class AbstractAuthorizerIntegrationTest extends BaseRequestTest { val tp = new TopicPartition(topic, part) val logDir = "logDir" val group = "my-group" + val shareGroup = "share-group" val protocolType = "consumer" val protocolName = "consumer-range" val clusterResource = new ResourcePattern(CLUSTER, Resource.CLUSTER_NAME, LITERAL) val topicResource = new ResourcePattern(TOPIC, topic, LITERAL) val groupResource = new ResourcePattern(GROUP, group, LITERAL) + val shareGroupResource = new ResourcePattern(GROUP, shareGroup, LITERAL) val transactionalIdResource = new ResourcePattern(TRANSACTIONAL_ID, transactionalId, LITERAL) producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1") diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index d75bdc9df6d..d42ba245f6e 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -37,7 +37,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection} -import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsR [...] +import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequestData, DeleteTopicsRequestData, D [...] import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord} @@ -48,7 +48,7 @@ import org.apache.kafka.common.resource.ResourceType._ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.{ElectionType, IsolationLevel, KafkaException, TopicPartition, Uuid, requests} +import org.apache.kafka.common.{ElectionType, IsolationLevel, KafkaException, TopicIdPartition, TopicPartition, Uuid, requests} import org.apache.kafka.test.{TestUtils => JTestUtils} import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST @@ -73,6 +73,11 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { val groupDeleteAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DELETE, ALLOW))) val groupDescribeConfigsAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, ALLOW))) val groupAlterConfigsAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW))) + val shareGroupReadAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW))) + val shareGroupDescribeAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW))) + val shareGroupDeleteAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DELETE, ALLOW))) + val shareGroupDescribeConfigsAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, ALLOW))) + val shareGroupAlterConfigsAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW))) val clusterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CLUSTER_ACTION, ALLOW))) val clusterCreateAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW))) val clusterAlterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER, ALLOW))) @@ -199,7 +204,26 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { }), ApiKeys.CONSUMER_GROUP_HEARTBEAT -> ((resp: ConsumerGroupHeartbeatResponse) => Errors.forCode(resp.data.errorCode)), ApiKeys.CONSUMER_GROUP_DESCRIBE -> ((resp: ConsumerGroupDescribeResponse) => - Errors.forCode(resp.data.groups.asScala.find(g => group == g.groupId).head.errorCode)) + Errors.forCode(resp.data.groups.asScala.find(g => group == g.groupId).head.errorCode)), + ApiKeys.SHARE_GROUP_HEARTBEAT -> ((resp: ShareGroupHeartbeatResponse) => Errors.forCode(resp.data.errorCode)), + ApiKeys.SHARE_GROUP_DESCRIBE -> ((resp: ShareGroupDescribeResponse) => + Errors.forCode(resp.data.groups.asScala.find(g => shareGroup == g.groupId).head.errorCode)), + ApiKeys.SHARE_FETCH -> ((resp: ShareFetchResponse) => Errors.forCode(resp.data.errorCode)), + ApiKeys.SHARE_ACKNOWLEDGE -> ((resp: ShareAcknowledgeResponse) => Errors.forCode(resp.data.errorCode)), + ApiKeys.INITIALIZE_SHARE_GROUP_STATE -> ((resp: InitializeShareGroupStateResponse) => Errors.forCode( + resp.data.results.get(0).partitions.get(0).errorCode)), + ApiKeys.READ_SHARE_GROUP_STATE -> ((resp: ReadShareGroupStateResponse) => Errors.forCode( + resp.data.results.get(0).partitions.get(0).errorCode)), + ApiKeys.WRITE_SHARE_GROUP_STATE -> ((resp: WriteShareGroupStateResponse) => Errors.forCode( + resp.data.results.get(0).partitions.get(0).errorCode)), + ApiKeys.DELETE_SHARE_GROUP_STATE -> ((resp: DeleteShareGroupStateResponse) => Errors.forCode( + resp.data.results.get(0).partitions.get(0).errorCode)), + ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY -> ((resp: ReadShareGroupStateSummaryResponse) => Errors.forCode( + resp.data.results.get(0).partitions.get(0).errorCode)), + ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> ((resp: DescribeShareGroupOffsetsResponse) => Errors.forCode( + resp.data.groups.asScala.find(g => shareGroup == g.groupId).head.errorCode)), + ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> ((resp: DeleteShareGroupOffsetsResponse) => Errors.forCode( + resp.data.errorCode)) ) def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = { @@ -255,7 +279,18 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { ApiKeys.DESCRIBE_PRODUCERS -> topicReadAcl, ApiKeys.DESCRIBE_TRANSACTIONS -> transactionalIdDescribeAcl, ApiKeys.CONSUMER_GROUP_HEARTBEAT -> groupReadAcl, - ApiKeys.CONSUMER_GROUP_DESCRIBE -> groupDescribeAcl + ApiKeys.CONSUMER_GROUP_DESCRIBE -> groupDescribeAcl, + ApiKeys.SHARE_GROUP_HEARTBEAT -> (shareGroupReadAcl ++ topicDescribeAcl), + ApiKeys.SHARE_GROUP_DESCRIBE -> (shareGroupDescribeAcl ++ topicDescribeAcl), + ApiKeys.SHARE_FETCH -> (shareGroupReadAcl ++ topicReadAcl), + ApiKeys.SHARE_ACKNOWLEDGE -> (shareGroupReadAcl ++ topicReadAcl), + ApiKeys.INITIALIZE_SHARE_GROUP_STATE -> clusterAcl, + ApiKeys.READ_SHARE_GROUP_STATE -> clusterAcl, + ApiKeys.WRITE_SHARE_GROUP_STATE -> clusterAcl, + ApiKeys.DELETE_SHARE_GROUP_STATE -> clusterAcl, + ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY -> clusterAcl, + ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> (shareGroupDescribeAcl ++ topicDescribeAcl), + ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> (shareGroupDeleteAcl ++ topicReadAcl) ) private def createMetadataRequest(allowAutoTopicCreation: Boolean) = { @@ -655,6 +690,120 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { .setGroupIds(List(group).asJava) .setIncludeAuthorizedOperations(false)).build() + private def shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder( + new ShareGroupHeartbeatRequestData() + .setGroupId(shareGroup) + .setMemberEpoch(0) + .setSubscribedTopicNames(List(topic).asJava)).build(ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion) + + + private def shareGroupDescribeRequest = new ShareGroupDescribeRequest.Builder( + new ShareGroupDescribeRequestData() + .setGroupIds(List(shareGroup).asJava) + .setIncludeAuthorizedOperations(false)).build(ApiKeys.SHARE_GROUP_DESCRIBE.latestVersion) + + + private def createShareFetchRequest = { + val metadata: ShareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH) + val send: Seq[TopicIdPartition] = Seq( + new TopicIdPartition(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID), new TopicPartition(topic, part))) + val ackMap = new util.HashMap[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] + requests.ShareFetchRequest.Builder.forConsumer(shareGroup, metadata, 100, 0, Int.MaxValue, 500, 500, + send.asJava, Seq.empty.asJava, ackMap).build() + } + + private def shareAcknowledgeRequest = { + val shareAcknowledgeRequestData = new ShareAcknowledgeRequestData() + .setGroupId(shareGroup) + .setMemberId(Uuid.randomUuid().toString) + .setShareSessionEpoch(1) + .setTopics(List(new ShareAcknowledgeRequestData.AcknowledgeTopic() + .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID)) + .setPartitions(List( + new ShareAcknowledgeRequestData.AcknowledgePartition() + .setPartitionIndex(part) + .setAcknowledgementBatches(List( + new ShareAcknowledgeRequestData.AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(1) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + new ShareAcknowledgeRequest.Builder(shareAcknowledgeRequestData).build(ApiKeys.SHARE_ACKNOWLEDGE.latestVersion) + } + + private def initializeShareGroupStateRequest = new InitializeShareGroupStateRequest.Builder( + new InitializeShareGroupStateRequestData() + .setGroupId(shareGroup) + .setTopics(List(new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID)) + .setPartitions(List(new InitializeShareGroupStateRequestData.PartitionData() + .setPartition(part) + ).asJava) + ).asJava)).build() + + private def readShareGroupStateRequest = new ReadShareGroupStateRequest.Builder( + new ReadShareGroupStateRequestData() + .setGroupId(shareGroup) + .setTopics(List(new ReadShareGroupStateRequestData.ReadStateData() + .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID)) + .setPartitions(List(new ReadShareGroupStateRequestData.PartitionData() + .setPartition(part) + .setLeaderEpoch(0) + ).asJava) + ).asJava)).build() + + private def writeShareGroupStateRequest = new WriteShareGroupStateRequest.Builder( + new WriteShareGroupStateRequestData() + .setGroupId(shareGroup) + .setTopics(List(new WriteShareGroupStateRequestData.WriteStateData() + .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID)) + .setPartitions(List(new WriteShareGroupStateRequestData.PartitionData() + .setPartition(part) + ).asJava) + ).asJava)).build() + + private def deleteShareGroupStateRequest = new DeleteShareGroupStateRequest.Builder( + new DeleteShareGroupStateRequestData() + .setGroupId(shareGroup) + .setTopics(List(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID)) + .setPartitions(List(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(part) + ).asJava) + ).asJava)).build() + + private def readShareGroupStateSummaryRequest = new ReadShareGroupStateSummaryRequest.Builder( + new ReadShareGroupStateSummaryRequestData() + .setGroupId(shareGroup) + .setTopics(List(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID)) + .setPartitions(List(new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(part) + .setLeaderEpoch(0) + ).asJava) + ).asJava)).build(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY.latestVersion) + + private def describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequest.Builder( + new DescribeShareGroupOffsetsRequestData() + .setGroups(List(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup() + .setGroupId(shareGroup) + .setTopics(List(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(topic) + .setPartitions(List(Integer.valueOf(part) + ).asJava) + ).asJava) + ).asJava)).build(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS.latestVersion) + + private def deleteShareGroupOffsetsRequest = new DeleteShareGroupOffsetsRequest.Builder( + new DeleteShareGroupOffsetsRequestData() + .setGroupId(shareGroup) + .setTopics(List(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic() + .setTopicName(topic) + ).asJava)).build(ApiKeys.DELETE_SHARE_GROUP_OFFSETS.latestVersion) + private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, AbstractRequest], topicExists: Boolean = true, topicNames: Map[Uuid, String] = getTopicNames()) = { for ((key, request) <- requestKeyToRequest) { @@ -669,6 +818,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { // In KRaft mode, trying to delete a topic that doesn't exist but that you do have // describe permission for will give UNKNOWN_TOPIC_OR_PARTITION. true + } else if (resourceToAcls.size > 1) { + false } else { describeAcls == acls } @@ -684,7 +835,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("kraft")) + @ValueSource(strings = Array("kip932")) def testAuthorizationWithTopicExisting(quorum: String): Unit = { //First create the topic so we have a valid topic ID sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)) @@ -723,6 +874,18 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { ApiKeys.WRITE_TXN_MARKERS -> writeTxnMarkersRequest, ApiKeys.CONSUMER_GROUP_HEARTBEAT -> consumerGroupHeartbeatRequest, ApiKeys.CONSUMER_GROUP_DESCRIBE -> consumerGroupDescribeRequest, + ApiKeys.SHARE_GROUP_HEARTBEAT -> shareGroupHeartbeatRequest, + ApiKeys.SHARE_GROUP_DESCRIBE -> shareGroupDescribeRequest, + ApiKeys.SHARE_FETCH -> createShareFetchRequest, + ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest, + ApiKeys.INITIALIZE_SHARE_GROUP_STATE -> initializeShareGroupStateRequest, + ApiKeys.READ_SHARE_GROUP_STATE -> readShareGroupStateRequest, + ApiKeys.WRITE_SHARE_GROUP_STATE -> writeShareGroupStateRequest, + ApiKeys.DELETE_SHARE_GROUP_STATE -> deleteShareGroupStateRequest, + ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY -> readShareGroupStateSummaryRequest, + ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest, + ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> deleteShareGroupOffsetsRequest, + // Delete the topic last ApiKeys.DELETE_TOPICS -> deleteTopicsRequest ) @@ -752,7 +915,10 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest, ApiKeys.DELETE_GROUPS -> deleteGroupsRequest, ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest, - ApiKeys.ELECT_LEADERS -> electLeadersRequest + ApiKeys.ELECT_LEADERS -> electLeadersRequest, + ApiKeys.SHARE_FETCH -> createShareFetchRequest, + ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest, + ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest ) sendRequests(requestKeyToRequest, false, topicNames) @@ -2653,6 +2819,476 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(0), fullRequest = true) } + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithGroupReadAndTopicDescribeAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithoutGroupReadOrTopicDescribeAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithoutGroupReadAcl(quorum: String): Unit = { + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithoutTopicDescribeAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + private def createShareGroupToDescribe(): Unit = { + createTopicWithBrokerPrincipal(topic) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), shareGroupResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) + shareConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroup) + val consumer = createShareConsumer() + consumer.subscribe(Collections.singleton(topic)) + consumer.poll(Duration.ofMillis(500L)) + removeAllClientAcls() + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupDescribeWithGroupDescribeAndTopicDescribeAcl(quorum: String): Unit = { + createShareGroupToDescribe() + addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource), shareGroupResource) + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = shareGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupDescribeWithOperationAll(quorum: String): Unit = { + createShareGroupToDescribe() + + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = shareGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupDescribeWithoutGroupDescribeAcl(quorum: String): Unit = { + createShareGroupToDescribe() + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = shareGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl(quorum: String): Unit = { + createShareGroupToDescribe() + + val request = shareGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithGroupReadAndTopicReadAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) + addAndVerifyAcls(topicReadAcl(topicResource), topicResource) + + val request = createShareFetchRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = createShareFetchRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithoutGroupReadOrTopicReadAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = createShareFetchRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithoutGroupReadAcl(quorum: String): Unit = { + addAndVerifyAcls(topicReadAcl(topicResource), topicResource) + + val request = createShareFetchRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithoutTopicReadAcl(quorum: String): Unit = { + createTopicWithBrokerPrincipal(topic) + addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) + + val request = createShareFetchRequest + val response = connectAndReceive[ShareFetchResponse](request, listenerName = listenerName) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.data.responses.get(0).partitions.get(0).errorCode)) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareAcknowledgeWithGroupReadAndTopicReadAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) + addAndVerifyAcls(topicReadAcl(topicResource), topicResource) + + val request = shareAcknowledgeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareAcknowledgeWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = shareAcknowledgeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareAcknowledgeWithoutGroupReadOrTopicReadAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = shareAcknowledgeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareAcknowledgeFetchWithoutGroupReadAcl(quorum: String): Unit = { + addAndVerifyAcls(topicReadAcl(topicResource), topicResource) + + val request = shareAcknowledgeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testInitializeShareGroupStateWithClusterAcl(quorum: String): Unit = { + addAndVerifyAcls(clusterAcl(clusterResource), clusterResource) + + val request = initializeShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testInitializeShareGroupStateWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource) + + val request = initializeShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testInitializeShareGroupStateWithoutClusterAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = initializeShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testReadShareGroupStateWithClusterAcl(quorum: String): Unit = { + addAndVerifyAcls(clusterAcl(clusterResource), clusterResource) + + val request = readShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testReadShareGroupStateWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource) + + val request = readShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testReadShareGroupStateWithoutClusterAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = readShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testWriteShareGroupStateWithClusterAcl(quorum: String): Unit = { + addAndVerifyAcls(clusterAcl(clusterResource), clusterResource) + + val request = writeShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testWriteShareGroupStateWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource) + + val request = writeShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testWriteShareGroupStateWithoutClusterAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = writeShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDeleteShareGroupStateWithClusterAcl(quorum: String): Unit = { + addAndVerifyAcls(clusterAcl(clusterResource), clusterResource) + + val request = deleteShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDeleteShareGroupStateWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource) + + val request = deleteShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDeleteShareGroupStateWithoutClusterAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = deleteShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testReadShareGroupStateSummaryWithClusterAcl(quorum: String): Unit = { + addAndVerifyAcls(clusterAcl(clusterResource), clusterResource) + + val request = readShareGroupStateSummaryRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testReadShareGroupStateSummaryWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource) + + val request = readShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testReadShareGroupStateSummaryWithoutClusterAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = readShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDescribeShareGroupOffsetsWithGroupDescribeAndTopicDescribeAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource), shareGroupResource) + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = describeShareGroupOffsetsRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDescribeShareGroupOffsetsWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = describeShareGroupOffsetsRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDescribeShareGroupOffsetsWithoutGroupDescribeOrTopicDescribeAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = describeShareGroupOffsetsRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDescribeShareGroupOffsetsWithoutGroupDescribeAcl(quorum: String): Unit = { + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = describeShareGroupOffsetsRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDescribeShareGroupOffsetsWithoutTopicDescribeAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource), shareGroupResource) + + val request = describeShareGroupOffsetsRequest + val response = connectAndReceive[DescribeShareGroupOffsetsResponse](request, listenerName = listenerName) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.data.groups.get(0).topics.get(0).partitions.get(0).errorCode)) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDeleteShareGroupOffsetsWithGroupDeleteAndTopicReadAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupDeleteAcl(shareGroupResource), shareGroupResource) + addAndVerifyAcls(topicReadAcl(topicResource), topicResource) + + val request = deleteShareGroupOffsetsRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDeleteShareGroupOffsetsWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = deleteShareGroupOffsetsRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDeleteShareGroupOffsetsWithoutGroupDeleteOrTopicReadAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = deleteShareGroupOffsetsRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDeleteShareGroupOffsetsWithoutGroupDeleteAcl(quorum: String): Unit = { + addAndVerifyAcls(topicReadAcl(topicResource), topicResource) + + val request = deleteShareGroupOffsetsRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDeleteShareGroupOffsetsWithoutTopicReadAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupDeleteAcl(shareGroupResource), shareGroupResource) + + val request = deleteShareGroupOffsetsRequest + val response = connectAndReceive[DeleteShareGroupOffsetsResponse](request, listenerName = listenerName) + assertEquals(1, response.data.responses.size) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, response.data.responses.get(0).errorCode, s"Unexpected response $response") + } + private def sendAndReceiveFirstRegexHeartbeat(memberId: String, listenerName: ListenerName): ConsumerGroupHeartbeatResponseData = { val request = new ConsumerGroupHeartbeatRequest.Builder(