This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6ef42d15241 MINOR: Deduplicate topics of a topology for authorization 
check (#19352)
6ef42d15241 is described below

commit 6ef42d15241732cc2e315ffe55d39897da6c9b02
Author: Bruno Cadonna <cado...@apache.org>
AuthorDate: Thu Apr 3 13:39:55 2025 +0200

    MINOR: Deduplicate topics of a topology for authorization check (#19352)
    
    With the new Streams rebalance protocol, the Streams client sends the
    topology with the used topics to the broker for initialization. For the
    initialization the broker needs to describe the topics in the topology
    and consequently the Streams application needs to be authorized to
    describe the topics.
    
    The broker checks the authorization by filtering the topics in the
    topology by authorization. This filtering implicitly deduplicates the
    topics of the topology if they appear multiple times in the topology
    send to the brokers. After that the broker compares the size of the
    authorized topics with the topics in the topology. If the authorized
    topics are less than the topics in the topology a
    TOPIC_AUTHORIZATION_FAILED error is returned.
    
    In Streams a topology that is sent to the brokers likely has duplicate
    topics because a repartition topic appears as a sink for one subtopology
    and as a source for another subtopology.
    
    This commit deduplicates the topics of a topology before the
    verification of the authorization.
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 66 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6de51d07332..9f416aa0e88 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2649,7 +2649,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               ++ 
(subtopology.repartitionSinkTopics().iterator().asScala:Iterator[String])
               ++ 
(subtopology.repartitionSourceTopics().iterator().asScala.map(_.name()):Iterator[String])
               ++ 
(subtopology.stateChangelogTopics().iterator().asScala.map(_.name()):Iterator[String])
-          ).toSeq
+          ).distinct.toSeq
 
         // While correctness of the heartbeat request is checked inside the 
group coordinator,
         // we are checking early that topics in the topology have valid names 
and are not internal
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 463a46ac121..a98b67fd912 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -9692,6 +9692,72 @@ class KafkaApisTest extends Logging {
     assertEquals(streamsGroupHeartbeatResponse, response.data)
   }
 
+  @Test
+  def testStreamsGroupHeartbeatRequestWithAuthorizedTopology(): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
+    val groupId = "group"
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
+    val zarTopicName = "zar"
+    val tarTopicName = "tar"
+    val booTopicName = "boo"
+
+    val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId(groupId).setTopology(
+      new StreamsGroupHeartbeatRequestData.Topology()
+        .setEpoch(3)
+        .setSubtopologies(
+          util.List.of(
+            new 
StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology1")
+              .setSourceTopics(Collections.singletonList(fooTopicName))
+              
.setRepartitionSinkTopics(Collections.singletonList(barTopicName))
+              .setStateChangelogTopics(Collections.singletonList(new 
StreamsGroupHeartbeatRequestData.TopicInfo().setName(tarTopicName))),
+            new 
StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology2")
+              .setSourceTopics(Collections.singletonList(zarTopicName))
+              .setRepartitionSourceTopics(Collections.singletonList(new 
StreamsGroupHeartbeatRequestData.TopicInfo().setName(barTopicName)))
+          )
+        )
+    )
+
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val acls = Map(
+      groupId -> AuthorizationResult.ALLOWED,
+      fooTopicName -> AuthorizationResult.ALLOWED,
+      barTopicName -> AuthorizationResult.ALLOWED,
+      zarTopicName -> AuthorizationResult.ALLOWED,
+      tarTopicName -> AuthorizationResult.ALLOWED,
+      booTopicName -> AuthorizationResult.ALLOWED
+    )
+    when(authorizer.authorize(
+      any[RequestContext],
+      any[util.List[Action]]
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument(1, classOf[util.List[Action]])
+      actions.asScala.map { action =>
+        acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+      }.asJava
+    }
+
+    val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
+    when(groupCoordinator.streamsGroupHeartbeat(
+      requestChannelRequest.context,
+      streamsGroupHeartbeatRequest
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(
+      authorizer = Some(authorizer),
+      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
+      .setMemberId("member")
+
+    future.complete(new 
StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, 
Collections.emptyMap()))
+    val response = 
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+    assertEquals(streamsGroupHeartbeatResponse, response.data)
+  }
+
   @Test
   def testStreamsGroupHeartbeatRequestFutureFailed(): Unit = {
     metadataCache = mock(classOf[KRaftMetadataCache])

Reply via email to