This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3c456ff0fe KAFKA-19169: Enhance AuthorizerIntegrationTest for share 
group APIs (#19540)
e3c456ff0fe is described below

commit e3c456ff0fe96827e55207a3326d68303563cd0d
Author: Lan Ding <53332773+dl1...@users.noreply.github.com>
AuthorDate: Thu May 1 17:13:43 2025 +0800

    KAFKA-19169: Enhance AuthorizerIntegrationTest for share group APIs (#19540)
    
    Enhance AuthorizerIntegrationTest for share group APIs
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>
---
 .../api/AbstractAuthorizerIntegrationTest.scala    |   2 +
 .../kafka/api/AuthorizerIntegrationTest.scala      | 648 ++++++++++++++++++++-
 2 files changed, 644 insertions(+), 6 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
index dc836352787..54f6d71a278 100644
--- 
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
@@ -80,11 +80,13 @@ class AbstractAuthorizerIntegrationTest extends 
BaseRequestTest {
   val tp = new TopicPartition(topic, part)
   val logDir = "logDir"
   val group = "my-group"
+  val shareGroup = "share-group"
   val protocolType = "consumer"
   val protocolName = "consumer-range"
   val clusterResource = new ResourcePattern(CLUSTER, Resource.CLUSTER_NAME, 
LITERAL)
   val topicResource = new ResourcePattern(TOPIC, topic, LITERAL)
   val groupResource = new ResourcePattern(GROUP, group, LITERAL)
+  val shareGroupResource = new ResourcePattern(GROUP, shareGroup, LITERAL)
   val transactionalIdResource = new ResourcePattern(TRANSACTIONAL_ID, 
transactionalId, LITERAL)
 
   producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index d75bdc9df6d..d42ba245f6e 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -37,7 +37,7 @@ import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, 
ListOffsetsTopic}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition,
 OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
-import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, 
ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, 
ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, 
CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, 
DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, 
DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsR [...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, 
ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, 
ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, 
CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, 
DeleteGroupsRequestData, DeleteRecordsRequestData, 
DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequestData, 
DeleteTopicsRequestData, D [...]
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
SimpleRecord}
@@ -48,7 +48,7 @@ import org.apache.kafka.common.resource.ResourceType._
 import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourcePatternFilter, ResourceType}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{ElectionType, IsolationLevel, KafkaException, 
TopicPartition, Uuid, requests}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, KafkaException, 
TopicIdPartition, TopicPartition, Uuid, requests}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
@@ -73,6 +73,11 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   val groupDeleteAcl = Map(groupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DELETE, ALLOW)))
   val groupDescribeConfigsAcl = Map(groupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, 
ALLOW)))
   val groupAlterConfigsAcl = Map(groupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW)))
+  val shareGroupReadAcl = Map(shareGroupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)))
+  val shareGroupDescribeAcl = Map(shareGroupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)))
+  val shareGroupDeleteAcl = Map(shareGroupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DELETE, ALLOW)))
+  val shareGroupDescribeConfigsAcl = Map(shareGroupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, 
ALLOW)))
+  val shareGroupAlterConfigsAcl = Map(shareGroupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW)))
   val clusterAcl = Map(clusterResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CLUSTER_ACTION, 
ALLOW)))
   val clusterCreateAcl = Map(clusterResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW)))
   val clusterAlterAcl = Map(clusterResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER, ALLOW)))
@@ -199,7 +204,26 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     }),
     ApiKeys.CONSUMER_GROUP_HEARTBEAT -> ((resp: 
ConsumerGroupHeartbeatResponse) => Errors.forCode(resp.data.errorCode)),
     ApiKeys.CONSUMER_GROUP_DESCRIBE -> ((resp: ConsumerGroupDescribeResponse) 
=>
-      Errors.forCode(resp.data.groups.asScala.find(g => group == 
g.groupId).head.errorCode))
+      Errors.forCode(resp.data.groups.asScala.find(g => group == 
g.groupId).head.errorCode)),
+    ApiKeys.SHARE_GROUP_HEARTBEAT -> ((resp: ShareGroupHeartbeatResponse) => 
Errors.forCode(resp.data.errorCode)),
+    ApiKeys.SHARE_GROUP_DESCRIBE -> ((resp: ShareGroupDescribeResponse) =>
+      Errors.forCode(resp.data.groups.asScala.find(g => shareGroup == 
g.groupId).head.errorCode)),
+    ApiKeys.SHARE_FETCH -> ((resp: ShareFetchResponse) => 
Errors.forCode(resp.data.errorCode)),
+    ApiKeys.SHARE_ACKNOWLEDGE -> ((resp: ShareAcknowledgeResponse) => 
Errors.forCode(resp.data.errorCode)),
+    ApiKeys.INITIALIZE_SHARE_GROUP_STATE -> ((resp: 
InitializeShareGroupStateResponse) => Errors.forCode(
+      resp.data.results.get(0).partitions.get(0).errorCode)),
+    ApiKeys.READ_SHARE_GROUP_STATE -> ((resp: ReadShareGroupStateResponse) => 
Errors.forCode(
+      resp.data.results.get(0).partitions.get(0).errorCode)),
+    ApiKeys.WRITE_SHARE_GROUP_STATE -> ((resp: WriteShareGroupStateResponse) 
=> Errors.forCode(
+      resp.data.results.get(0).partitions.get(0).errorCode)),
+    ApiKeys.DELETE_SHARE_GROUP_STATE -> ((resp: DeleteShareGroupStateResponse) 
=> Errors.forCode(
+      resp.data.results.get(0).partitions.get(0).errorCode)),
+    ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY -> ((resp: 
ReadShareGroupStateSummaryResponse) => Errors.forCode(
+      resp.data.results.get(0).partitions.get(0).errorCode)),
+    ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> ((resp: 
DescribeShareGroupOffsetsResponse) => Errors.forCode(
+      resp.data.groups.asScala.find(g => shareGroup == 
g.groupId).head.errorCode)),
+    ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> ((resp: 
DeleteShareGroupOffsetsResponse) => Errors.forCode(
+      resp.data.errorCode))
   )
 
   def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = {
@@ -255,7 +279,18 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     ApiKeys.DESCRIBE_PRODUCERS -> topicReadAcl,
     ApiKeys.DESCRIBE_TRANSACTIONS -> transactionalIdDescribeAcl,
     ApiKeys.CONSUMER_GROUP_HEARTBEAT -> groupReadAcl,
-    ApiKeys.CONSUMER_GROUP_DESCRIBE -> groupDescribeAcl
+    ApiKeys.CONSUMER_GROUP_DESCRIBE -> groupDescribeAcl,
+    ApiKeys.SHARE_GROUP_HEARTBEAT -> (shareGroupReadAcl ++ topicDescribeAcl),
+    ApiKeys.SHARE_GROUP_DESCRIBE -> (shareGroupDescribeAcl ++ 
topicDescribeAcl),
+    ApiKeys.SHARE_FETCH -> (shareGroupReadAcl ++ topicReadAcl),
+    ApiKeys.SHARE_ACKNOWLEDGE -> (shareGroupReadAcl ++ topicReadAcl),
+    ApiKeys.INITIALIZE_SHARE_GROUP_STATE -> clusterAcl,
+    ApiKeys.READ_SHARE_GROUP_STATE -> clusterAcl,
+    ApiKeys.WRITE_SHARE_GROUP_STATE -> clusterAcl,
+    ApiKeys.DELETE_SHARE_GROUP_STATE -> clusterAcl,
+    ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY -> clusterAcl,
+    ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> (shareGroupDescribeAcl ++ 
topicDescribeAcl),
+    ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> (shareGroupDeleteAcl ++ topicReadAcl)
   )
 
   private def createMetadataRequest(allowAutoTopicCreation: Boolean) = {
@@ -655,6 +690,120 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
       .setGroupIds(List(group).asJava)
       .setIncludeAuthorizedOperations(false)).build()
 
+  private def shareGroupHeartbeatRequest = new 
ShareGroupHeartbeatRequest.Builder(
+    new ShareGroupHeartbeatRequestData()
+      .setGroupId(shareGroup)
+      .setMemberEpoch(0)
+      
.setSubscribedTopicNames(List(topic).asJava)).build(ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion)
+
+
+  private def shareGroupDescribeRequest = new 
ShareGroupDescribeRequest.Builder(
+    new ShareGroupDescribeRequestData()
+      .setGroupIds(List(shareGroup).asJava)
+      
.setIncludeAuthorizedOperations(false)).build(ApiKeys.SHARE_GROUP_DESCRIBE.latestVersion)
+
+
+  private def createShareFetchRequest = {
+    val metadata: ShareRequestMetadata = new 
ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH)
+    val send: Seq[TopicIdPartition] = Seq(
+      new TopicIdPartition(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID), 
new TopicPartition(topic, part)))
+    val ackMap = new util.HashMap[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]]
+    requests.ShareFetchRequest.Builder.forConsumer(shareGroup, metadata, 100, 
0, Int.MaxValue, 500, 500,
+      send.asJava, Seq.empty.asJava, ackMap).build()
+  }
+
+  private def shareAcknowledgeRequest = {
+    val shareAcknowledgeRequestData = new ShareAcknowledgeRequestData()
+      .setGroupId(shareGroup)
+      .setMemberId(Uuid.randomUuid().toString)
+      .setShareSessionEpoch(1)
+      .setTopics(List(new ShareAcknowledgeRequestData.AcknowledgeTopic()
+        .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
+        .setPartitions(List(
+          new ShareAcknowledgeRequestData.AcknowledgePartition()
+            .setPartitionIndex(part)
+            .setAcknowledgementBatches(List(
+              new ShareAcknowledgeRequestData.AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(1)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)
+      ).asJava)
+
+    new 
ShareAcknowledgeRequest.Builder(shareAcknowledgeRequestData).build(ApiKeys.SHARE_ACKNOWLEDGE.latestVersion)
+  }
+
+  private def initializeShareGroupStateRequest = new 
InitializeShareGroupStateRequest.Builder(
+    new InitializeShareGroupStateRequestData()
+      .setGroupId(shareGroup)
+      .setTopics(List(new 
InitializeShareGroupStateRequestData.InitializeStateData()
+        .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
+        .setPartitions(List(new 
InitializeShareGroupStateRequestData.PartitionData()
+          .setPartition(part)
+        ).asJava)
+      ).asJava)).build()
+
+  private def readShareGroupStateRequest = new 
ReadShareGroupStateRequest.Builder(
+    new ReadShareGroupStateRequestData()
+      .setGroupId(shareGroup)
+      .setTopics(List(new ReadShareGroupStateRequestData.ReadStateData()
+        .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
+        .setPartitions(List(new ReadShareGroupStateRequestData.PartitionData()
+          .setPartition(part)
+          .setLeaderEpoch(0)
+        ).asJava)
+      ).asJava)).build()
+
+  private def writeShareGroupStateRequest = new 
WriteShareGroupStateRequest.Builder(
+    new WriteShareGroupStateRequestData()
+      .setGroupId(shareGroup)
+      .setTopics(List(new WriteShareGroupStateRequestData.WriteStateData()
+        .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
+        .setPartitions(List(new WriteShareGroupStateRequestData.PartitionData()
+          .setPartition(part)
+        ).asJava)
+      ).asJava)).build()
+
+  private def deleteShareGroupStateRequest = new 
DeleteShareGroupStateRequest.Builder(
+    new DeleteShareGroupStateRequestData()
+      .setGroupId(shareGroup)
+      .setTopics(List(new DeleteShareGroupStateRequestData.DeleteStateData()
+        .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
+        .setPartitions(List(new 
DeleteShareGroupStateRequestData.PartitionData()
+          .setPartition(part)
+        ).asJava)
+      ).asJava)).build()
+
+  private def readShareGroupStateSummaryRequest = new 
ReadShareGroupStateSummaryRequest.Builder(
+    new ReadShareGroupStateSummaryRequestData()
+      .setGroupId(shareGroup)
+      .setTopics(List(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+        .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
+        .setPartitions(List(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
+          .setPartition(part)
+          .setLeaderEpoch(0)
+        ).asJava)
+      ).asJava)).build(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY.latestVersion)
+
+  private def describeShareGroupOffsetsRequest = new 
DescribeShareGroupOffsetsRequest.Builder(
+    new DescribeShareGroupOffsetsRequestData()
+      .setGroups(List(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+        .setGroupId(shareGroup)
+        .setTopics(List(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+          .setTopicName(topic)
+          .setPartitions(List(Integer.valueOf(part)
+          ).asJava)
+        ).asJava)
+      ).asJava)).build(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS.latestVersion)
+
+  private def deleteShareGroupOffsetsRequest = new 
DeleteShareGroupOffsetsRequest.Builder(
+    new DeleteShareGroupOffsetsRequestData()
+      .setGroupId(shareGroup)
+      .setTopics(List(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+        .setTopicName(topic)
+      ).asJava)).build(ApiKeys.DELETE_SHARE_GROUP_OFFSETS.latestVersion)
+
   private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, 
AbstractRequest], topicExists: Boolean = true,
                            topicNames: Map[Uuid, String] = getTopicNames()) = {
     for ((key, request) <- requestKeyToRequest) {
@@ -669,6 +818,8 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
           // In KRaft mode, trying to delete a topic that doesn't exist but 
that you do have
           // describe permission for will give UNKNOWN_TOPIC_OR_PARTITION.
           true
+        } else if (resourceToAcls.size > 1) {
+          false
         } else {
           describeAcls == acls
         }
@@ -684,7 +835,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("kraft"))
+  @ValueSource(strings = Array("kip932"))
   def testAuthorizationWithTopicExisting(quorum: String): Unit = {
     //First create the topic so we have a valid topic ID
     sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
@@ -723,6 +874,18 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
       ApiKeys.WRITE_TXN_MARKERS -> writeTxnMarkersRequest,
       ApiKeys.CONSUMER_GROUP_HEARTBEAT -> consumerGroupHeartbeatRequest,
       ApiKeys.CONSUMER_GROUP_DESCRIBE -> consumerGroupDescribeRequest,
+      ApiKeys.SHARE_GROUP_HEARTBEAT -> shareGroupHeartbeatRequest,
+      ApiKeys.SHARE_GROUP_DESCRIBE -> shareGroupDescribeRequest,
+      ApiKeys.SHARE_FETCH -> createShareFetchRequest,
+      ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest,
+      ApiKeys.INITIALIZE_SHARE_GROUP_STATE -> initializeShareGroupStateRequest,
+      ApiKeys.READ_SHARE_GROUP_STATE -> readShareGroupStateRequest,
+      ApiKeys.WRITE_SHARE_GROUP_STATE -> writeShareGroupStateRequest,
+      ApiKeys.DELETE_SHARE_GROUP_STATE -> deleteShareGroupStateRequest,
+      ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY -> 
readShareGroupStateSummaryRequest,
+      ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest,
+      ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> deleteShareGroupOffsetsRequest,
+
       // Delete the topic last
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
     )
@@ -752,7 +915,10 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
       ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
       ApiKeys.DELETE_GROUPS -> deleteGroupsRequest,
       ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,
-      ApiKeys.ELECT_LEADERS -> electLeadersRequest
+      ApiKeys.ELECT_LEADERS -> electLeadersRequest,
+      ApiKeys.SHARE_FETCH -> createShareFetchRequest,
+      ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest,
+      ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest
     )
 
     sendRequests(requestKeyToRequest, false, topicNames)
@@ -2653,6 +2819,476 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, 
Some(0), fullRequest = true)
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareGroupHeartbeatWithGroupReadAndTopicDescribeAcl(quorum: String): 
Unit = {
+    addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+    val request = shareGroupHeartbeatRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareGroupHeartbeatWithOperationAll(quorum: String): Unit = {
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
+    addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+    val request = shareGroupHeartbeatRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareGroupHeartbeatWithoutGroupReadOrTopicDescribeAcl(quorum: 
String): Unit = {
+    removeAllClientAcls()
+
+    val request = shareGroupHeartbeatRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareGroupHeartbeatWithoutGroupReadAcl(quorum: String): Unit = {
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+    val request = shareGroupHeartbeatRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareGroupHeartbeatWithoutTopicDescribeAcl(quorum: String): Unit = {
+    addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
+
+    val request = shareGroupHeartbeatRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  private def createShareGroupToDescribe(): Unit = {
+    createTopicWithBrokerPrincipal(topic)
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)), shareGroupResource)
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)), topicResource)
+    shareConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroup)
+    val consumer = createShareConsumer()
+    consumer.subscribe(Collections.singleton(topic))
+    consumer.poll(Duration.ofMillis(500L))
+    removeAllClientAcls()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareGroupDescribeWithGroupDescribeAndTopicDescribeAcl(quorum: 
String): Unit = {
+    createShareGroupToDescribe()
+    addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource), 
shareGroupResource)
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+    val request = shareGroupDescribeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareGroupDescribeWithOperationAll(quorum: String): Unit = {
+    createShareGroupToDescribe()
+
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
+    addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+    val request = shareGroupDescribeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareGroupDescribeWithoutGroupDescribeAcl(quorum: String): Unit = {
+    createShareGroupToDescribe()
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+    val request = shareGroupDescribeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl(quorum: 
String): Unit = {
+    createShareGroupToDescribe()
+
+    val request = shareGroupDescribeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareFetchWithGroupReadAndTopicReadAcl(quorum: String): Unit = {
+    addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
+    addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
+
+    val request = createShareFetchRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareFetchWithOperationAll(quorum: String): Unit = {
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
+    addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+    val request = createShareFetchRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareFetchWithoutGroupReadOrTopicReadAcl(quorum: String): Unit = {
+    removeAllClientAcls()
+
+    val request = createShareFetchRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareFetchWithoutGroupReadAcl(quorum: String): Unit = {
+    addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
+
+    val request = createShareFetchRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareFetchWithoutTopicReadAcl(quorum: String): Unit = {
+    createTopicWithBrokerPrincipal(topic)
+    addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
+
+    val request = createShareFetchRequest
+    val response = connectAndReceive[ShareFetchResponse](request, listenerName 
= listenerName)
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, 
Errors.forCode(response.data.responses.get(0).partitions.get(0).errorCode))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareAcknowledgeWithGroupReadAndTopicReadAcl(quorum: String): Unit = 
{
+    addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
+    addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
+
+    val request = shareAcknowledgeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareAcknowledgeWithOperationAll(quorum: String): Unit = {
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
+    addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+    val request = shareAcknowledgeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareAcknowledgeWithoutGroupReadOrTopicReadAcl(quorum: String): Unit 
= {
+    removeAllClientAcls()
+
+    val request = shareAcknowledgeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testShareAcknowledgeFetchWithoutGroupReadAcl(quorum: String): Unit = {
+    addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
+
+    val request = shareAcknowledgeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testInitializeShareGroupStateWithClusterAcl(quorum: String): Unit = {
+    addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
+
+    val request = initializeShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testInitializeShareGroupStateWithOperationAll(quorum: String): Unit = {
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
+
+    val request = initializeShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testInitializeShareGroupStateWithoutClusterAcl(quorum: String): Unit = {
+    removeAllClientAcls()
+
+    val request = initializeShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testReadShareGroupStateWithClusterAcl(quorum: String): Unit = {
+    addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
+
+    val request = readShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testReadShareGroupStateWithOperationAll(quorum: String): Unit = {
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
+
+    val request = readShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testReadShareGroupStateWithoutClusterAcl(quorum: String): Unit = {
+    removeAllClientAcls()
+
+    val request = readShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testWriteShareGroupStateWithClusterAcl(quorum: String): Unit = {
+    addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
+
+    val request = writeShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testWriteShareGroupStateWithOperationAll(quorum: String): Unit = {
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
+
+    val request = writeShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testWriteShareGroupStateWithoutClusterAcl(quorum: String): Unit = {
+    removeAllClientAcls()
+
+    val request = writeShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testDeleteShareGroupStateWithClusterAcl(quorum: String): Unit = {
+    addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
+
+    val request = deleteShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testDeleteShareGroupStateWithOperationAll(quorum: String): Unit = {
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
+
+    val request = deleteShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testDeleteShareGroupStateWithoutClusterAcl(quorum: String): Unit = {
+    removeAllClientAcls()
+
+    val request = deleteShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testReadShareGroupStateSummaryWithClusterAcl(quorum: String): Unit = {
+    addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
+
+    val request = readShareGroupStateSummaryRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testReadShareGroupStateSummaryWithOperationAll(quorum: String): Unit = {
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
+
+    val request = readShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testReadShareGroupStateSummaryWithoutClusterAcl(quorum: String): Unit = {
+    removeAllClientAcls()
+
+    val request = readShareGroupStateRequest
+    val resource = Set[ResourceType](CLUSTER)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def 
testDescribeShareGroupOffsetsWithGroupDescribeAndTopicDescribeAcl(quorum: 
String): Unit = {
+    addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource), 
shareGroupResource)
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+    val request = describeShareGroupOffsetsRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testDescribeShareGroupOffsetsWithOperationAll(quorum: String): Unit = {
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
+    addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+    val request = describeShareGroupOffsetsRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def 
testDescribeShareGroupOffsetsWithoutGroupDescribeOrTopicDescribeAcl(quorum: 
String): Unit = {
+    removeAllClientAcls()
+
+    val request = describeShareGroupOffsetsRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testDescribeShareGroupOffsetsWithoutGroupDescribeAcl(quorum: String): 
Unit = {
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+    val request = describeShareGroupOffsetsRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testDescribeShareGroupOffsetsWithoutTopicDescribeAcl(quorum: String): 
Unit = {
+    addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource), 
shareGroupResource)
+
+    val request = describeShareGroupOffsetsRequest
+    val response = 
connectAndReceive[DescribeShareGroupOffsetsResponse](request, listenerName = 
listenerName)
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, 
Errors.forCode(response.data.groups.get(0).topics.get(0).partitions.get(0).errorCode))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testDeleteShareGroupOffsetsWithGroupDeleteAndTopicReadAcl(quorum: 
String): Unit = {
+    addAndVerifyAcls(shareGroupDeleteAcl(shareGroupResource), 
shareGroupResource)
+    addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
+
+    val request = deleteShareGroupOffsetsRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testDeleteShareGroupOffsetsWithOperationAll(quorum: String): Unit = {
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
+    addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+    val request = deleteShareGroupOffsetsRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testDeleteShareGroupOffsetsWithoutGroupDeleteOrTopicReadAcl(quorum: 
String): Unit = {
+    removeAllClientAcls()
+
+    val request = deleteShareGroupOffsetsRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testDeleteShareGroupOffsetsWithoutGroupDeleteAcl(quorum: String): Unit = 
{
+    addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
+
+    val request = deleteShareGroupOffsetsRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kip932"))
+  def testDeleteShareGroupOffsetsWithoutTopicReadAcl(quorum: String): Unit = {
+    addAndVerifyAcls(shareGroupDeleteAcl(shareGroupResource), 
shareGroupResource)
+
+    val request = deleteShareGroupOffsetsRequest
+    val response = connectAndReceive[DeleteShareGroupOffsetsResponse](request, 
listenerName = listenerName)
+    assertEquals(1, response.data.responses.size)
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, 
response.data.responses.get(0).errorCode, s"Unexpected response $response")
+  }
+
   private def sendAndReceiveFirstRegexHeartbeat(memberId: String,
                                                 listenerName: ListenerName): 
ConsumerGroupHeartbeatResponseData = {
     val request = new ConsumerGroupHeartbeatRequest.Builder(


Reply via email to