This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e246f46b75b50a73da4765627ce8ba3edd5c88c2
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Jun 17 16:41:49 2025 +0200

    KAFKA-19412: Extended AuthorizerIntegrationTest to cover 
StreamsGroupHeartbeat (#19978)
    
    Extending test coverage of authorization for streams group RPC
    StreamsGroupHeartbeat. The RPC requires READ GROUP and DESCRIBE TOPIC
    permissions for all topics. For creating internal topics, we require
    CREATE TOPIC permission. If internal topic  creation fails, the request
    does not fail, but the status reflects this problem.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  18 +-
 .../api/AbstractAuthorizerIntegrationTest.scala    |   4 +
 .../kafka/api/AuthorizerIntegrationTest.scala      | 253 ++++++++++++++++++++-
 3 files changed, 264 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 3ca43aa145b..f69cbb0eb66 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2792,11 +2792,19 @@ class KafkaApis(val requestChannel: RequestChannel,
               if (responseData.status() == null) {
                 responseData.setStatus(new util.ArrayList());
               }
-              responseData.status().add(
-                new StreamsGroupHeartbeatResponseData.Status()
-                  
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
-                  .setStatusDetail("Unauthorized to CREATE on topics " + 
createTopicUnauthorized.mkString(",") + ".")
-              )
+              val missingInternalTopicStatus =
+                responseData.status().stream().filter(x => x.statusCode() == 
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst()
+              if (missingInternalTopicStatus.isPresent) {
+                missingInternalTopicStatus.get().setStatusDetail(
+                  missingInternalTopicStatus.get().statusDetail() + "; 
Unauthorized to CREATE on topics " + createTopicUnauthorized.mkString(", ") + 
"."
+                )
+              } else {
+                responseData.status().add(
+                  new StreamsGroupHeartbeatResponseData.Status()
+                    
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+                    .setStatusDetail("Unauthorized to CREATE on topics " + 
createTopicUnauthorized.mkString(", ") + ".")
+                )
+              }
             } else {
               
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, 
requestContext);
             }
diff --git 
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
index 0281d43a947..fafce17382c 100644
--- 
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
@@ -71,6 +71,7 @@ class AbstractAuthorizerIntegrationTest extends 
BaseRequestTest {
 
   val brokerId: Integer = 0
   val topic = "topic"
+  val sourceTopic = "source-topic"
   val topicPattern = "topic.*"
   val transactionalId = "transactional.id"
   val producerId = 83392L
@@ -81,12 +82,15 @@ class AbstractAuthorizerIntegrationTest extends 
BaseRequestTest {
   val logDir = "logDir"
   val group = "my-group"
   val shareGroup = "share-group"
+  val streamsGroup = "streams-group"
   val protocolType = "consumer"
   val protocolName = "consumer-range"
   val clusterResource = new ResourcePattern(CLUSTER, Resource.CLUSTER_NAME, 
LITERAL)
   val topicResource = new ResourcePattern(TOPIC, topic, LITERAL)
+  val sourceTopicResource = new ResourcePattern(TOPIC, sourceTopic, LITERAL)
   val groupResource = new ResourcePattern(GROUP, group, LITERAL)
   val shareGroupResource = new ResourcePattern(GROUP, shareGroup, LITERAL)
+  val streamsGroupResource = new ResourcePattern(GROUP, streamsGroup, LITERAL)
   val transactionalIdResource = new ResourcePattern(TRANSACTIONAL_ID, 
transactionalId, LITERAL)
 
   producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 85f89c1b647..a869e2eb4ab 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -37,7 +37,7 @@ import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, 
ListOffsetsTopic}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition,
 OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
-import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, 
AlterShareGroupOffsetsRequestData, ConsumerGroupDescribeRequestData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, 
DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, 
DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequ [...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, 
AlterShareGroupOffsetsRequestData, ConsumerGroupDescribeRequestData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, 
DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, 
DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequ [...]
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
SimpleRecord}
@@ -53,8 +53,7 @@ import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{MethodSource, ValueSource}
-
+import org.junit.jupiter.params.provider.{CsvSource, MethodSource, ValueSource}
 import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic
 import 
org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, 
WritableTxnMarkerTopic}
 import org.apache.kafka.coordinator.group.GroupConfig
@@ -76,6 +75,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   val shareGroupDeleteAcl = Map(shareGroupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DELETE, ALLOW)))
   val shareGroupDescribeConfigsAcl = Map(shareGroupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, 
ALLOW)))
   val shareGroupAlterConfigsAcl = Map(shareGroupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW)))
+  val streamsGroupReadAcl = Map(streamsGroupResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)))
   val clusterAcl = Map(clusterResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CLUSTER_ACTION, 
ALLOW)))
   val clusterCreateAcl = Map(clusterResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW)))
   val clusterAlterAcl = Map(clusterResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER, ALLOW)))
@@ -92,6 +92,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   val topicAlterConfigsAcl = Map(topicResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW)))
   val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, WRITE, ALLOW)))
   val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)))
+  val sourceTopicDescribeAcl = Map(sourceTopicResource -> Set(new 
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)))
 
   val numRecords = 1
 
@@ -223,7 +224,8 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> ((resp: 
DeleteShareGroupOffsetsResponse) => Errors.forCode(
       resp.data.errorCode)),
     ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> ((resp: 
AlterShareGroupOffsetsResponse) => Errors.forCode(
-      resp.data.errorCode))
+      resp.data.errorCode)),
+    ApiKeys.STREAMS_GROUP_HEARTBEAT -> ((resp: StreamsGroupHeartbeatResponse) 
=> Errors.forCode(resp.data.errorCode))
   )
 
   def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = {
@@ -291,7 +293,8 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY -> clusterAcl,
     ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> (shareGroupDescribeAcl ++ 
topicDescribeAcl),
     ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> (shareGroupDeleteAcl ++ 
topicReadAcl),
-    ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> (shareGroupReadAcl ++ topicReadAcl)
+    ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> (shareGroupReadAcl ++ topicReadAcl),
+    ApiKeys.STREAMS_GROUP_HEARTBEAT -> (streamsGroupReadAcl ++ 
topicDescribeAcl)
   )
 
   private def createMetadataRequest(allowAutoTopicCreation: Boolean) = {
@@ -825,6 +828,48 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     new 
AlterShareGroupOffsetsRequest.Builder(data).build(ApiKeys.ALTER_SHARE_GROUP_OFFSETS.latestVersion)
   }
 
+  private def streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequest.Builder(
+    new StreamsGroupHeartbeatRequestData()
+      .setGroupId(streamsGroup)
+      .setMemberId("member-id")
+      .setMemberEpoch(0)
+      .setRebalanceTimeoutMs(1000)
+      .setActiveTasks(List.empty.asJava)
+      .setStandbyTasks(List.empty.asJava)
+      .setWarmupTasks(List.empty.asJava)
+      .setTopology(new 
StreamsGroupHeartbeatRequestData.Topology().setSubtopologies(
+        List(new StreamsGroupHeartbeatRequestData.Subtopology()
+          .setSourceTopics(List(topic).asJava)
+        ).asJava
+      ))).build(ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion)
+
+  private def streamsGroupHeartbeatRequest(
+                                             topicAsSourceTopic: Boolean,
+                                             topicAsRepartitionSinkTopic: 
Boolean,
+                                             topicAsRepartitionSourceTopic: 
Boolean,
+                                             topicAsStateChangelogTopics: 
Boolean
+                                           ) = new 
StreamsGroupHeartbeatRequest.Builder(
+    new StreamsGroupHeartbeatRequestData()
+      .setGroupId(streamsGroup)
+      .setMemberId("member-id")
+      .setMemberEpoch(0)
+      .setRebalanceTimeoutMs(1000)
+      .setActiveTasks(List.empty.asJava)
+      .setStandbyTasks(List.empty.asJava)
+      .setWarmupTasks(List.empty.asJava)
+      .setTopology(new 
StreamsGroupHeartbeatRequestData.Topology().setSubtopologies(
+        List(new StreamsGroupHeartbeatRequestData.Subtopology()
+          .setSourceTopics(
+            (if (topicAsSourceTopic) List(sourceTopic, topic) else 
List(sourceTopic)).asJava)
+          .setRepartitionSinkTopics(
+            (if (topicAsRepartitionSinkTopic) List(topic) else 
List.empty).asJava)
+          .setRepartitionSourceTopics(
+            (if (topicAsRepartitionSourceTopic) List(new 
StreamsGroupHeartbeatRequestData.TopicInfo().setName(topic).setPartitions(3)) 
else List.empty).asJava)
+          .setStateChangelogTopics(
+            (if (topicAsStateChangelogTopics) List(new 
StreamsGroupHeartbeatRequestData.TopicInfo().setName(topic)) else 
List.empty).asJava)
+        ).asJava
+      ))).build(ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion)
+
   private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, 
AbstractRequest], topicExists: Boolean = true,
                            topicNames: Map[Uuid, String] = getTopicNames()) = {
     for ((key, request) <- requestKeyToRequest) {
@@ -908,6 +953,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
       ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest,
       ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> deleteShareGroupOffsetsRequest,
       ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> alterShareGroupOffsetsRequest,
+      ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest,
 
       // Delete the topic last
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
@@ -940,7 +986,8 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
       ApiKeys.ELECT_LEADERS -> electLeadersRequest,
       ApiKeys.SHARE_FETCH -> createShareFetchRequest,
       ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest,
-      ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest
+      ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest,
+      ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest
     )
 
     sendRequests(requestKeyToRequest, topicExists = false, topicNames)
@@ -3612,6 +3659,200 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, 
response.data.responses.stream().findFirst().get().partitions.get(0).errorCode, 
s"Unexpected response $response")
   }
 
+  @ParameterizedTest
+  @CsvSource(Array(
+    "true,  false, false, false",
+    "false, true,  false, false",
+    "false, false, true,  false",
+    "false, false, false, true"
+  ))
+  def testStreamsGroupHeartbeatWithGroupReadAndTopicDescribeAcl(
+                                                                 
topicAsSourceTopic: Boolean,
+                                                                 
topicAsRepartitionSinkTopic: Boolean,
+                                                                 
topicAsRepartitionSourceTopic: Boolean,
+                                                                 
topicAsStateChangelogTopics: Boolean
+                                                               ): Unit = {
+    addAndVerifyAcls(streamsGroupReadAcl(streamsGroupResource), 
streamsGroupResource)
+    addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), 
sourceTopicResource) // Always added, since we need a source topic
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+    val request = streamsGroupHeartbeatRequest(
+      topicAsSourceTopic,
+      topicAsRepartitionSinkTopic,
+      topicAsRepartitionSourceTopic,
+      topicAsStateChangelogTopics
+    )
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array(
+    "true,  false, false, false",
+    "false, true,  false, false",
+    "false, false, true,  false",
+    "false, false, false, true"
+  ))
+  def testStreamsGroupHeartbeatWithOperationAll(
+                                                 topicAsSourceTopic: Boolean,
+                                                 topicAsRepartitionSinkTopic: 
Boolean,
+                                                 
topicAsRepartitionSourceTopic: Boolean,
+                                                 topicAsStateChangelogTopics: 
Boolean
+                                               ): Unit = {
+    val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
+    addAndVerifyAcls(Set(allowAllOpsAcl), streamsGroupResource)
+    addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+    addAndVerifyAcls(Set(allowAllOpsAcl), sourceTopicResource)
+
+    val request = streamsGroupHeartbeatRequest(
+      topicAsSourceTopic,
+      topicAsRepartitionSinkTopic,
+      topicAsRepartitionSourceTopic,
+      topicAsStateChangelogTopics
+    )
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array(
+    "true,  false, false, false",
+    "false, true,  false, false",
+    "false, false, true,  false",
+    "false, false, false, true"
+  ))
+  def testStreamsGroupHeartbeatWithoutGroupReadOrTopicDescribeAcl(
+                                                                   
topicAsSourceTopic: Boolean,
+                                                                   
topicAsRepartitionSinkTopic: Boolean,
+                                                                   
topicAsRepartitionSourceTopic: Boolean,
+                                                                   
topicAsStateChangelogTopics: Boolean
+                                                                 ): Unit = {
+    removeAllClientAcls()
+
+    val request = streamsGroupHeartbeatRequest(
+      topicAsSourceTopic,
+      topicAsRepartitionSinkTopic,
+      topicAsRepartitionSourceTopic,
+      topicAsStateChangelogTopics
+    )
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array(
+    "true,  false, false, false",
+    "false, true,  false, false",
+    "false, false, true,  false",
+    "false, false, false, true"
+  ))
+  def testStreamsGroupHeartbeatWithoutGroupReadAcl(
+                                                    topicAsSourceTopic: 
Boolean,
+                                                    
topicAsRepartitionSinkTopic: Boolean,
+                                                    
topicAsRepartitionSourceTopic: Boolean,
+                                                    
topicAsStateChangelogTopics: Boolean
+                                                  ): Unit = {
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+    addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), 
sourceTopicResource) // Always added, since we need a source topic
+
+    val request = streamsGroupHeartbeatRequest(
+      topicAsSourceTopic,
+      topicAsRepartitionSinkTopic,
+      topicAsRepartitionSourceTopic,
+      topicAsStateChangelogTopics
+    )
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array(
+    "true,  false, false, false",
+    "false, true,  false, false",
+    "false, false, true,  false",
+    "false, false, false, true"
+  ))
+  def testStreamsGroupHeartbeatWithoutTopicDescribeAcl(
+                                                        topicAsSourceTopic: 
Boolean,
+                                                        
topicAsRepartitionSinkTopic: Boolean,
+                                                        
topicAsRepartitionSourceTopic: Boolean,
+                                                        
topicAsStateChangelogTopics: Boolean
+                                                      ): Unit = {
+    addAndVerifyAcls(streamsGroupReadAcl(streamsGroupResource), 
streamsGroupResource)
+    addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), 
sourceTopicResource) // Always added, since we need a source topic
+
+    val request = streamsGroupHeartbeatRequest(
+      topicAsSourceTopic,
+      topicAsRepartitionSinkTopic,
+      topicAsRepartitionSourceTopic,
+      topicAsStateChangelogTopics
+    )
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array(
+    "true,  false",
+    "false, true"
+  ))
+  def testStreamsGroupHeartbeatWithoutInternalTopicCreateAcl(
+                                                              
topicAsRepartitionSourceTopic: Boolean,
+                                                              
topicAsStateChangelogTopics: Boolean
+                                                            ): Unit = {
+    createTopicWithBrokerPrincipal(sourceTopic)
+    addAndVerifyAcls(streamsGroupReadAcl(streamsGroupResource), 
streamsGroupResource)
+    addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), 
sourceTopicResource) // Always added, since we need a source topic
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+    val request = streamsGroupHeartbeatRequest(
+      topicAsSourceTopic = false,
+      topicAsRepartitionSinkTopic = false,
+      topicAsRepartitionSourceTopic = topicAsRepartitionSourceTopic,
+      topicAsStateChangelogTopics = topicAsStateChangelogTopics
+    )
+    val resource = Set[ResourceType](GROUP, TOPIC)
+
+    // Request successful, but internal topic not created.
+    val response = sendRequestAndVerifyResponseError(request, resource, 
isAuthorized = true).asInstanceOf[StreamsGroupHeartbeatResponse]
+    assertEquals(
+      util.List.of(new StreamsGroupHeartbeatResponseData.Status()
+        
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+        .setStatusDetail("Internal topics are missing: [topic]; Unauthorized 
to CREATE on topics topic.")),
+    response.data().status())
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array(
+    "true,  false",
+    "false, true"
+  ))
+  def testStreamsGroupHeartbeatWithInternalTopicCreateAcl(
+                                                           
topicAsRepartitionSourceTopic: Boolean,
+                                                           
topicAsStateChangelogTopics: Boolean
+                                                         ): Unit = {
+    createTopicWithBrokerPrincipal(sourceTopic)
+    addAndVerifyAcls(streamsGroupReadAcl(streamsGroupResource), 
streamsGroupResource)
+    addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), 
sourceTopicResource) // Always added, since we need a source topic
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+    addAndVerifyAcls(topicCreateAcl(topicResource), topicResource)
+
+    val request = streamsGroupHeartbeatRequest(
+      topicAsSourceTopic = false,
+      topicAsRepartitionSinkTopic = false,
+      topicAsRepartitionSourceTopic = topicAsRepartitionSourceTopic,
+      topicAsStateChangelogTopics = topicAsStateChangelogTopics
+    )
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    val response = sendRequestAndVerifyResponseError(request, resource, 
isAuthorized = true).asInstanceOf[StreamsGroupHeartbeatResponse]
+    // Request successful, and no internal topic creation error.
+    assertEquals(
+      util.List.of(new StreamsGroupHeartbeatResponseData.Status()
+        
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+        .setStatusDetail("Internal topics are missing: [topic]")),
+      response.data().status())
+  }
+
   private def sendAndReceiveFirstRegexHeartbeat(memberId: String,
                                                 listenerName: ListenerName): 
ConsumerGroupHeartbeatResponseData = {
     val request = new ConsumerGroupHeartbeatRequest.Builder(

Reply via email to