This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 6ef42d15241 MINOR: Deduplicate topics of a topology for authorization check (#19352) 6ef42d15241 is described below commit 6ef42d15241732cc2e315ffe55d39897da6c9b02 Author: Bruno Cadonna <cado...@apache.org> AuthorDate: Thu Apr 3 13:39:55 2025 +0200 MINOR: Deduplicate topics of a topology for authorization check (#19352) With the new Streams rebalance protocol, the Streams client sends the topology with the used topics to the broker for initialization. For the initialization the broker needs to describe the topics in the topology and consequently the Streams application needs to be authorized to describe the topics. The broker checks the authorization by filtering the topics in the topology by authorization. This filtering implicitly deduplicates the topics of the topology if they appear multiple times in the topology send to the brokers. After that the broker compares the size of the authorized topics with the topics in the topology. If the authorized topics are less than the topics in the topology a TOPIC_AUTHORIZATION_FAILED error is returned. In Streams a topology that is sent to the brokers likely has duplicate topics because a repartition topic appears as a sink for one subtopology and as a source for another subtopology. This commit deduplicates the topics of a topology before the verification of the authorization. Reviewers: Lucas Brutschy <lbruts...@confluent.io> --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 66 ++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6de51d07332..9f416aa0e88 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2649,7 +2649,7 @@ class KafkaApis(val requestChannel: RequestChannel, ++ (subtopology.repartitionSinkTopics().iterator().asScala:Iterator[String]) ++ (subtopology.repartitionSourceTopics().iterator().asScala.map(_.name()):Iterator[String]) ++ (subtopology.stateChangelogTopics().iterator().asScala.map(_.name()):Iterator[String]) - ).toSeq + ).distinct.toSeq // While correctness of the heartbeat request is checked inside the group coordinator, // we are checking early that topics in the topology have valid names and are not internal diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 463a46ac121..a98b67fd912 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -9692,6 +9692,72 @@ class KafkaApisTest extends Logging { assertEquals(streamsGroupHeartbeatResponse, response.data) } + @Test + def testStreamsGroupHeartbeatRequestWithAuthorizedTopology(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + val groupId = "group" + val fooTopicName = "foo" + val barTopicName = "bar" + val zarTopicName = "zar" + val tarTopicName = "tar" + val booTopicName = "boo" + + val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId(groupId).setTopology( + new StreamsGroupHeartbeatRequestData.Topology() + .setEpoch(3) + .setSubtopologies( + util.List.of( + new StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology1") + .setSourceTopics(Collections.singletonList(fooTopicName)) + .setRepartitionSinkTopics(Collections.singletonList(barTopicName)) + .setStateChangelogTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName(tarTopicName))), + new StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology2") + .setSourceTopics(Collections.singletonList(zarTopicName)) + .setRepartitionSourceTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName(barTopicName))) + ) + ) + ) + + val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + val acls = Map( + groupId -> AuthorizationResult.ALLOWED, + fooTopicName -> AuthorizationResult.ALLOWED, + barTopicName -> AuthorizationResult.ALLOWED, + zarTopicName -> AuthorizationResult.ALLOWED, + tarTopicName -> AuthorizationResult.ALLOWED, + booTopicName -> AuthorizationResult.ALLOWED + ) + when(authorizer.authorize( + any[RequestContext], + any[util.List[Action]] + )).thenAnswer { invocation => + val actions = invocation.getArgument(1, classOf[util.List[Action]]) + actions.asScala.map { action => + acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED) + }.asJava + } + + val future = new CompletableFuture[StreamsGroupHeartbeatResult]() + when(groupCoordinator.streamsGroupHeartbeat( + requestChannelRequest.context, + streamsGroupHeartbeatRequest + )).thenReturn(future) + kafkaApis = createKafkaApis( + authorizer = Some(authorizer), + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData() + .setMemberId("member") + + future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, Collections.emptyMap())) + val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) + assertEquals(streamsGroupHeartbeatResponse, response.data) + } + @Test def testStreamsGroupHeartbeatRequestFutureFailed(): Unit = { metadataCache = mock(classOf[KRaftMetadataCache])