This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch 4.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e246f46b75b50a73da4765627ce8ba3edd5c88c2 Author: Lucas Brutschy <[email protected]> AuthorDate: Tue Jun 17 16:41:49 2025 +0200 KAFKA-19412: Extended AuthorizerIntegrationTest to cover StreamsGroupHeartbeat (#19978) Extending test coverage of authorization for streams group RPC StreamsGroupHeartbeat. The RPC requires READ GROUP and DESCRIBE TOPIC permissions for all topics. For creating internal topics, we require CREATE TOPIC permission. If internal topic creation fails, the request does not fail, but the status reflects this problem. Reviewers: Bill Bejeck <[email protected]> --- core/src/main/scala/kafka/server/KafkaApis.scala | 18 +- .../api/AbstractAuthorizerIntegrationTest.scala | 4 + .../kafka/api/AuthorizerIntegrationTest.scala | 253 ++++++++++++++++++++- 3 files changed, 264 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 3ca43aa145b..f69cbb0eb66 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2792,11 +2792,19 @@ class KafkaApis(val requestChannel: RequestChannel, if (responseData.status() == null) { responseData.setStatus(new util.ArrayList()); } - responseData.status().add( - new StreamsGroupHeartbeatResponseData.Status() - .setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) - .setStatusDetail("Unauthorized to CREATE on topics " + createTopicUnauthorized.mkString(",") + ".") - ) + val missingInternalTopicStatus = + responseData.status().stream().filter(x => x.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst() + if (missingInternalTopicStatus.isPresent) { + missingInternalTopicStatus.get().setStatusDetail( + missingInternalTopicStatus.get().statusDetail() + "; Unauthorized to CREATE on topics " + createTopicUnauthorized.mkString(", ") + "." + ) + } else { + responseData.status().add( + new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) + .setStatusDetail("Unauthorized to CREATE on topics " + createTopicUnauthorized.mkString(", ") + ".") + ) + } } else { autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, requestContext); } diff --git a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala index 0281d43a947..fafce17382c 100644 --- a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala @@ -71,6 +71,7 @@ class AbstractAuthorizerIntegrationTest extends BaseRequestTest { val brokerId: Integer = 0 val topic = "topic" + val sourceTopic = "source-topic" val topicPattern = "topic.*" val transactionalId = "transactional.id" val producerId = 83392L @@ -81,12 +82,15 @@ class AbstractAuthorizerIntegrationTest extends BaseRequestTest { val logDir = "logDir" val group = "my-group" val shareGroup = "share-group" + val streamsGroup = "streams-group" val protocolType = "consumer" val protocolName = "consumer-range" val clusterResource = new ResourcePattern(CLUSTER, Resource.CLUSTER_NAME, LITERAL) val topicResource = new ResourcePattern(TOPIC, topic, LITERAL) + val sourceTopicResource = new ResourcePattern(TOPIC, sourceTopic, LITERAL) val groupResource = new ResourcePattern(GROUP, group, LITERAL) val shareGroupResource = new ResourcePattern(GROUP, shareGroup, LITERAL) + val streamsGroupResource = new ResourcePattern(GROUP, streamsGroup, LITERAL) val transactionalIdResource = new ResourcePattern(TRANSACTIONAL_ID, transactionalId, LITERAL) producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1") diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 85f89c1b647..a869e2eb4ab 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -37,7 +37,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection} -import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, AlterShareGroupOffsetsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequ [...] +import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, AlterShareGroupOffsetsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequ [...] import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord} @@ -53,8 +53,7 @@ import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{MethodSource, ValueSource} - +import org.junit.jupiter.params.provider.{CsvSource, MethodSource, ValueSource} import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic} import org.apache.kafka.coordinator.group.GroupConfig @@ -76,6 +75,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { val shareGroupDeleteAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DELETE, ALLOW))) val shareGroupDescribeConfigsAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, ALLOW))) val shareGroupAlterConfigsAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW))) + val streamsGroupReadAcl = Map(streamsGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW))) val clusterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CLUSTER_ACTION, ALLOW))) val clusterCreateAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW))) val clusterAlterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER, ALLOW))) @@ -92,6 +92,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { val topicAlterConfigsAcl = Map(topicResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW))) val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, WRITE, ALLOW))) val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW))) + val sourceTopicDescribeAcl = Map(sourceTopicResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW))) val numRecords = 1 @@ -223,7 +224,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> ((resp: DeleteShareGroupOffsetsResponse) => Errors.forCode( resp.data.errorCode)), ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> ((resp: AlterShareGroupOffsetsResponse) => Errors.forCode( - resp.data.errorCode)) + resp.data.errorCode)), + ApiKeys.STREAMS_GROUP_HEARTBEAT -> ((resp: StreamsGroupHeartbeatResponse) => Errors.forCode(resp.data.errorCode)) ) def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = { @@ -291,7 +293,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY -> clusterAcl, ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> (shareGroupDescribeAcl ++ topicDescribeAcl), ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> (shareGroupDeleteAcl ++ topicReadAcl), - ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> (shareGroupReadAcl ++ topicReadAcl) + ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> (shareGroupReadAcl ++ topicReadAcl), + ApiKeys.STREAMS_GROUP_HEARTBEAT -> (streamsGroupReadAcl ++ topicDescribeAcl) ) private def createMetadataRequest(allowAutoTopicCreation: Boolean) = { @@ -825,6 +828,48 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { new AlterShareGroupOffsetsRequest.Builder(data).build(ApiKeys.ALTER_SHARE_GROUP_OFFSETS.latestVersion) } + private def streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(streamsGroup) + .setMemberId("member-id") + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(List.empty.asJava) + .setStandbyTasks(List.empty.asJava) + .setWarmupTasks(List.empty.asJava) + .setTopology(new StreamsGroupHeartbeatRequestData.Topology().setSubtopologies( + List(new StreamsGroupHeartbeatRequestData.Subtopology() + .setSourceTopics(List(topic).asJava) + ).asJava + ))).build(ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion) + + private def streamsGroupHeartbeatRequest( + topicAsSourceTopic: Boolean, + topicAsRepartitionSinkTopic: Boolean, + topicAsRepartitionSourceTopic: Boolean, + topicAsStateChangelogTopics: Boolean + ) = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(streamsGroup) + .setMemberId("member-id") + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(List.empty.asJava) + .setStandbyTasks(List.empty.asJava) + .setWarmupTasks(List.empty.asJava) + .setTopology(new StreamsGroupHeartbeatRequestData.Topology().setSubtopologies( + List(new StreamsGroupHeartbeatRequestData.Subtopology() + .setSourceTopics( + (if (topicAsSourceTopic) List(sourceTopic, topic) else List(sourceTopic)).asJava) + .setRepartitionSinkTopics( + (if (topicAsRepartitionSinkTopic) List(topic) else List.empty).asJava) + .setRepartitionSourceTopics( + (if (topicAsRepartitionSourceTopic) List(new StreamsGroupHeartbeatRequestData.TopicInfo().setName(topic).setPartitions(3)) else List.empty).asJava) + .setStateChangelogTopics( + (if (topicAsStateChangelogTopics) List(new StreamsGroupHeartbeatRequestData.TopicInfo().setName(topic)) else List.empty).asJava) + ).asJava + ))).build(ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion) + private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, AbstractRequest], topicExists: Boolean = true, topicNames: Map[Uuid, String] = getTopicNames()) = { for ((key, request) <- requestKeyToRequest) { @@ -908,6 +953,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest, ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> deleteShareGroupOffsetsRequest, ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> alterShareGroupOffsetsRequest, + ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest, // Delete the topic last ApiKeys.DELETE_TOPICS -> deleteTopicsRequest @@ -940,7 +986,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { ApiKeys.ELECT_LEADERS -> electLeadersRequest, ApiKeys.SHARE_FETCH -> createShareFetchRequest, ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest, - ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest + ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest, + ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest ) sendRequests(requestKeyToRequest, topicExists = false, topicNames) @@ -3612,6 +3659,200 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, response.data.responses.stream().findFirst().get().partitions.get(0).errorCode, s"Unexpected response $response") } + @ParameterizedTest + @CsvSource(Array( + "true, false, false, false", + "false, true, false, false", + "false, false, true, false", + "false, false, false, true" + )) + def testStreamsGroupHeartbeatWithGroupReadAndTopicDescribeAcl( + topicAsSourceTopic: Boolean, + topicAsRepartitionSinkTopic: Boolean, + topicAsRepartitionSourceTopic: Boolean, + topicAsStateChangelogTopics: Boolean + ): Unit = { + addAndVerifyAcls(streamsGroupReadAcl(streamsGroupResource), streamsGroupResource) + addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), sourceTopicResource) // Always added, since we need a source topic + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = streamsGroupHeartbeatRequest( + topicAsSourceTopic, + topicAsRepartitionSinkTopic, + topicAsRepartitionSourceTopic, + topicAsStateChangelogTopics + ) + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @CsvSource(Array( + "true, false, false, false", + "false, true, false, false", + "false, false, true, false", + "false, false, false, true" + )) + def testStreamsGroupHeartbeatWithOperationAll( + topicAsSourceTopic: Boolean, + topicAsRepartitionSinkTopic: Boolean, + topicAsRepartitionSourceTopic: Boolean, + topicAsStateChangelogTopics: Boolean + ): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), streamsGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + addAndVerifyAcls(Set(allowAllOpsAcl), sourceTopicResource) + + val request = streamsGroupHeartbeatRequest( + topicAsSourceTopic, + topicAsRepartitionSinkTopic, + topicAsRepartitionSourceTopic, + topicAsStateChangelogTopics + ) + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @CsvSource(Array( + "true, false, false, false", + "false, true, false, false", + "false, false, true, false", + "false, false, false, true" + )) + def testStreamsGroupHeartbeatWithoutGroupReadOrTopicDescribeAcl( + topicAsSourceTopic: Boolean, + topicAsRepartitionSinkTopic: Boolean, + topicAsRepartitionSourceTopic: Boolean, + topicAsStateChangelogTopics: Boolean + ): Unit = { + removeAllClientAcls() + + val request = streamsGroupHeartbeatRequest( + topicAsSourceTopic, + topicAsRepartitionSinkTopic, + topicAsRepartitionSourceTopic, + topicAsStateChangelogTopics + ) + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @CsvSource(Array( + "true, false, false, false", + "false, true, false, false", + "false, false, true, false", + "false, false, false, true" + )) + def testStreamsGroupHeartbeatWithoutGroupReadAcl( + topicAsSourceTopic: Boolean, + topicAsRepartitionSinkTopic: Boolean, + topicAsRepartitionSourceTopic: Boolean, + topicAsStateChangelogTopics: Boolean + ): Unit = { + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), sourceTopicResource) // Always added, since we need a source topic + + val request = streamsGroupHeartbeatRequest( + topicAsSourceTopic, + topicAsRepartitionSinkTopic, + topicAsRepartitionSourceTopic, + topicAsStateChangelogTopics + ) + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @CsvSource(Array( + "true, false, false, false", + "false, true, false, false", + "false, false, true, false", + "false, false, false, true" + )) + def testStreamsGroupHeartbeatWithoutTopicDescribeAcl( + topicAsSourceTopic: Boolean, + topicAsRepartitionSinkTopic: Boolean, + topicAsRepartitionSourceTopic: Boolean, + topicAsStateChangelogTopics: Boolean + ): Unit = { + addAndVerifyAcls(streamsGroupReadAcl(streamsGroupResource), streamsGroupResource) + addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), sourceTopicResource) // Always added, since we need a source topic + + val request = streamsGroupHeartbeatRequest( + topicAsSourceTopic, + topicAsRepartitionSinkTopic, + topicAsRepartitionSourceTopic, + topicAsStateChangelogTopics + ) + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @CsvSource(Array( + "true, false", + "false, true" + )) + def testStreamsGroupHeartbeatWithoutInternalTopicCreateAcl( + topicAsRepartitionSourceTopic: Boolean, + topicAsStateChangelogTopics: Boolean + ): Unit = { + createTopicWithBrokerPrincipal(sourceTopic) + addAndVerifyAcls(streamsGroupReadAcl(streamsGroupResource), streamsGroupResource) + addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), sourceTopicResource) // Always added, since we need a source topic + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = streamsGroupHeartbeatRequest( + topicAsSourceTopic = false, + topicAsRepartitionSinkTopic = false, + topicAsRepartitionSourceTopic = topicAsRepartitionSourceTopic, + topicAsStateChangelogTopics = topicAsStateChangelogTopics + ) + val resource = Set[ResourceType](GROUP, TOPIC) + + // Request successful, but internal topic not created. + val response = sendRequestAndVerifyResponseError(request, resource, isAuthorized = true).asInstanceOf[StreamsGroupHeartbeatResponse] + assertEquals( + util.List.of(new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) + .setStatusDetail("Internal topics are missing: [topic]; Unauthorized to CREATE on topics topic.")), + response.data().status()) + } + + @ParameterizedTest + @CsvSource(Array( + "true, false", + "false, true" + )) + def testStreamsGroupHeartbeatWithInternalTopicCreateAcl( + topicAsRepartitionSourceTopic: Boolean, + topicAsStateChangelogTopics: Boolean + ): Unit = { + createTopicWithBrokerPrincipal(sourceTopic) + addAndVerifyAcls(streamsGroupReadAcl(streamsGroupResource), streamsGroupResource) + addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), sourceTopicResource) // Always added, since we need a source topic + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + addAndVerifyAcls(topicCreateAcl(topicResource), topicResource) + + val request = streamsGroupHeartbeatRequest( + topicAsSourceTopic = false, + topicAsRepartitionSinkTopic = false, + topicAsRepartitionSourceTopic = topicAsRepartitionSourceTopic, + topicAsStateChangelogTopics = topicAsStateChangelogTopics + ) + val resource = Set[ResourceType](GROUP, TOPIC) + val response = sendRequestAndVerifyResponseError(request, resource, isAuthorized = true).asInstanceOf[StreamsGroupHeartbeatResponse] + // Request successful, and no internal topic creation error. + assertEquals( + util.List.of(new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) + .setStatusDetail("Internal topics are missing: [topic]")), + response.data().status()) + } + private def sendAndReceiveFirstRegexHeartbeat(memberId: String, listenerName: ListenerName): ConsumerGroupHeartbeatResponseData = { val request = new ConsumerGroupHeartbeatRequest.Builder(
