This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e73719d9624 KAFKA-18819 StreamsGroupHeartbeat API and 
StreamsGroupDescribe API  check topic describe (#19183)
e73719d9624 is described below

commit e73719d9624efa041498776a6bc0158cb82ac3d9
Author: Lan Ding <53332773+dl1...@users.noreply.github.com>
AuthorDate: Thu Mar 20 03:42:05 2025 +0800

    KAFKA-18819 StreamsGroupHeartbeat API and StreamsGroupDescribe API  check 
topic describe (#19183)
    
    This patch filters out the topic describe unauthorized topics from the
    StreamsGroupHeartbeat and StreamsGroupDescribe response.
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>
---
 .../internals/DescribeStreamsGroupsHandler.java    |   1 +
 .../requests/StreamsGroupDescribeResponse.java     |   1 +
 .../message/StreamsGroupDescribeResponse.json      |   1 +
 .../message/StreamsGroupHeartbeatResponse.json     |   2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  55 ++++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 181 ++++++++++++++++++++-
 6 files changed, 232 insertions(+), 9 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
index 8bf793bd31c..8355a78b9d4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
@@ -238,6 +238,7 @@ public class DescribeStreamsGroupsHandler extends 
AdminApiHandler.Batched<Coordi
             Set<CoordinatorKey> groupsToUnmap) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
+            case TOPIC_AUTHORIZATION_FAILED:
                 log.debug("`DescribeStreamsGroups` request for group id {} 
failed due to error {}", groupId.idValue, error);
                 failed.put(groupId, error.exception(errorMsg));
                 break;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
index 83db6700a4a..0439b955325 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
@@ -35,6 +35,7 @@ import java.util.Map;
  * - {@link Errors#INVALID_REQUEST}
  * - {@link Errors#INVALID_GROUP_ID}
  * - {@link Errors#GROUP_ID_NOT_FOUND}
+ * - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
  */
 public class StreamsGroupDescribeResponse extends AbstractResponse {
 
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
index 9cf2954c17f..5dff3d7bf44 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
@@ -27,6 +27,7 @@
   // - INVALID_REQUEST (version 0+)
   // - INVALID_GROUP_ID (version 0+)
   // - GROUP_ID_NOT_FOUND (version 0+)
+  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
index 43b5268e205..a5f3a99f9de 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
@@ -30,7 +30,7 @@
   // - FENCED_MEMBER_EPOCH (version 0+)
   // - UNRELEASED_INSTANCE_ID (version 0+)
   // - GROUP_MAX_SIZE_REACHED (version 0+)
-  // - TOPIC_AUTHORIZATION_FAILED (version 0+) 
+  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
   // - CLUSTER_AUTHORIZATION_FAILED (version 0+)
   // - STREAMS_INVALID_TOPOLOGY (version 0+)
   // - STREAMS_INVALID_TOPOLOGY_EPOCH (version 0+)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index adf5c5a6e53..e2d8e17f950 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2707,11 +2707,20 @@ class KafkaApis(val requestChannel: RequestChannel,
           requestHelper.sendMaybeThrottle(request, new 
StreamsGroupHeartbeatResponse(errorResponse))
           return CompletableFuture.completedFuture[Unit](())
         }
+
+        if (requiredTopics.nonEmpty) {
+          val authorizedTopics = 
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
requiredTopics)(identity)
+          if (authorizedTopics.size < requiredTopics.size) {
+            val responseData = new 
StreamsGroupHeartbeatResponseData().setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            requestHelper.sendMaybeThrottle(request, new 
StreamsGroupHeartbeatResponse(responseData))
+            return CompletableFuture.completedFuture[Unit](())
+          }
+        }
       }
 
       groupCoordinator.streamsGroupHeartbeat(
         request.context,
-        streamsGroupHeartbeatRequest.data,
+        streamsGroupHeartbeatRequest.data
       ).handle[Unit] { (response, exception) =>
         if (exception != null) {
           requestHelper.sendMaybeThrottle(request, 
streamsGroupHeartbeatRequest.getErrorResponse(exception))
@@ -2795,6 +2804,50 @@ class KafkaApis(val requestChannel: RequestChannel,
             response.groups.addAll(results)
           }
 
+          // Clients are not allowed to see topics that are not authorized for 
Describe.
+          if (authorizer.isDefined) {
+            val topicsToCheck = response.groups.stream()
+              .filter(group => group.topology != null)
+              .flatMap(group => group.topology.subtopologies.stream)
+              .flatMap(subtopology => java.util.stream.Stream.concat(
+                java.util.stream.Stream.concat(
+                  java.util.stream.Stream.concat(
+                    subtopology.sourceTopics.stream,
+                    subtopology.repartitionSinkTopics.stream),
+                  subtopology.repartitionSourceTopics.stream.map(_.name)),
+                subtopology.stateChangelogTopics.stream.map(_.name)))
+              .collect(Collectors.toSet[String])
+              .asScala
+
+            val authorizedTopics = 
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
+              topicsToCheck)(identity)
+
+              val updatedGroups = response.groups.stream.map { group =>
+                val hasUnauthorizedTopic = if (group.topology == null) false 
else
+                  group.topology.subtopologies.stream()
+                    .flatMap(subtopology => java.util.stream.Stream.concat(
+                      java.util.stream.Stream.concat(
+                        java.util.stream.Stream.concat(
+                          subtopology.sourceTopics.stream,
+                          subtopology.repartitionSinkTopics.stream),
+                        
subtopology.repartitionSourceTopics.stream.map(_.name)),
+                      subtopology.stateChangelogTopics.stream.map(_.name)))
+                    .anyMatch(topic => !authorizedTopics.contains(topic))
+
+              if (hasUnauthorizedTopic) {
+                new StreamsGroupDescribeResponseData.DescribedGroup()
+                  .setGroupId(group.groupId)
+                  .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+                  .setErrorMessage("The described group uses topics that the 
client is not authorized to describe.")
+                  .setMembers(List.empty.asJava)
+                  .setTopology(null)
+              } else {
+                group
+              }
+            
}.collect(Collectors.toList[StreamsGroupDescribeResponseData.DescribedGroup])
+            response.setGroups(updatedGroups)
+          }
+
           requestHelper.sendMaybeThrottle(request, new 
StreamsGroupDescribeResponse(response))
         }
       }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 70a64f47160..9c382d709c4 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -9883,7 +9883,7 @@ class KafkaApisTest extends Logging {
   }
 
   @Test
-  def testStreamsGroupHeartbeatRequestAuthorizationFailed(): Unit = {
+  def testStreamsGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = {
     metadataCache = mock(classOf[KRaftMetadataCache])
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
@@ -9903,6 +9903,58 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
+    val groupId = "group"
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
+    val zarTopicName = "zar"
+    val tarTopicName = "tar"
+
+    val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId(groupId).setTopology(
+      new StreamsGroupHeartbeatRequestData.Topology()
+        .setEpoch(3)
+        .setSubtopologies(
+          util.List.of(new 
StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology")
+            .setSourceTopics(Collections.singletonList(fooTopicName))
+            .setRepartitionSinkTopics(Collections.singletonList(barTopicName))
+            .setRepartitionSourceTopics(Collections.singletonList(new 
StreamsGroupHeartbeatRequestData.TopicInfo().setName(zarTopicName)))
+            .setStateChangelogTopics(Collections.singletonList(new 
StreamsGroupHeartbeatRequestData.TopicInfo().setName(tarTopicName)))
+          )
+        )
+    )
+
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val acls = Map(
+      groupId -> AuthorizationResult.ALLOWED,
+      fooTopicName -> AuthorizationResult.ALLOWED,
+      barTopicName -> AuthorizationResult.DENIED,
+      zarTopicName -> AuthorizationResult.ALLOWED,
+      tarTopicName -> AuthorizationResult.ALLOWED
+    )
+    when(authorizer.authorize(
+      any[RequestContext],
+      any[util.List[Action]]
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument(1, classOf[util.List[Action]])
+      actions.asScala.map { action =>
+        acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+      }.asJava
+    }
+
+    kafkaApis = createKafkaApis(
+      authorizer = Some(authorizer),
+      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val response = 
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
+  }
+
   @Test
   def testStreamsGroupHeartbeatRequestProtocolDisabled(): Unit = {
     metadataCache = mock(classOf[KRaftMetadataCache])
@@ -10230,6 +10282,8 @@ class KafkaApisTest extends Logging {
   @ValueSource(booleans = Array(true, false))
   def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
     metadataCache = mock(classOf[KRaftMetadataCache])
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
 
     val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
     val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
@@ -10247,10 +10301,32 @@ class KafkaApisTest extends Logging {
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
+    val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology()
+      .setSubtopologyId("subtopology0")
+      .setSourceTopics(Collections.singletonList(fooTopicName))
+
+    val subtopology1 = new StreamsGroupDescribeResponseData.Subtopology()
+      .setSubtopologyId("subtopology1")
+      .setRepartitionSinkTopics(Collections.singletonList(barTopicName))
+
+    val subtopology2 = new StreamsGroupDescribeResponseData.Subtopology()
+      .setSubtopologyId("subtopology2")
+      .setSourceTopics(Collections.singletonList(fooTopicName))
+      .setRepartitionSinkTopics(Collections.singletonList(barTopicName))
+
     future.complete(List(
-      new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
-      new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
-      new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
+      new StreamsGroupDescribeResponseData.DescribedGroup()
+        .setGroupId(groupIds.get(0))
+        .setTopology(new StreamsGroupDescribeResponseData.Topology()
+          .setSubtopologies(Collections.singletonList(subtopology0))),
+      new StreamsGroupDescribeResponseData.DescribedGroup()
+        .setGroupId(groupIds.get(1))
+        .setTopology(new StreamsGroupDescribeResponseData.Topology()
+          .setSubtopologies(Collections.singletonList(subtopology1))),
+      new StreamsGroupDescribeResponseData.DescribedGroup()
+        .setGroupId(groupIds.get(2))
+        .setTopology(new StreamsGroupDescribeResponseData.Topology()
+          .setSubtopologies(Collections.singletonList(subtopology2)))
     ).asJava)
 
     var authorizedOperationsInt = Int.MinValue;
@@ -10262,9 +10338,18 @@ class KafkaApisTest extends Logging {
 
     // Can't reuse the above list here because we would not test the 
implementation in KafkaApis then
     val describedGroups = List(
-      new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
-      new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
-      new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
+      new StreamsGroupDescribeResponseData.DescribedGroup()
+        .setGroupId(groupIds.get(0))
+        .setTopology(new StreamsGroupDescribeResponseData.Topology()
+          .setSubtopologies(Collections.singletonList(subtopology0))),
+      new StreamsGroupDescribeResponseData.DescribedGroup()
+        .setGroupId(groupIds.get(1))
+        .setTopology(new StreamsGroupDescribeResponseData.Topology()
+          .setSubtopologies(Collections.singletonList(subtopology1))),
+      new StreamsGroupDescribeResponseData.DescribedGroup()
+        .setGroupId(groupIds.get(2))
+        .setTopology(new StreamsGroupDescribeResponseData.Topology()
+          .setSubtopologies(Collections.singletonList(subtopology2)))
     ).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
     val expectedStreamsGroupDescribeResponseData = new 
StreamsGroupDescribeResponseData()
       .setGroups(describedGroups.asJava)
@@ -10353,6 +10438,88 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.FENCED_MEMBER_EPOCH.code, 
response.data.groups.get(0).errorCode)
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStreamsGroupDescribeFilterUnauthorizedTopics(includeAuthorizedOperations: 
Boolean): Unit = {
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
+    val errorMessage = "The described group uses topics that the client is not 
authorized to describe."
+
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
+    val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
+      .setIncludeAuthorizedOperations(includeAuthorizedOperations)
+    streamsGroupDescribeRequestData.groupIds.addAll(groupIds)
+    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, 
true).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val acls = Map(
+      groupIds.get(0) -> AuthorizationResult.ALLOWED,
+      groupIds.get(1) -> AuthorizationResult.ALLOWED,
+      groupIds.get(2) -> AuthorizationResult.ALLOWED,
+      fooTopicName    -> AuthorizationResult.ALLOWED,
+      barTopicName    -> AuthorizationResult.DENIED,
+    )
+    when(authorizer.authorize(
+      any[RequestContext],
+      any[util.List[Action]]
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument(1, classOf[util.List[Action]])
+      actions.asScala.map { action =>
+        acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+      }.asJava
+    }
+
+    val future = new 
CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.streamsGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(
+      authorizer = Some(authorizer),
+      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology()
+      .setSubtopologyId("subtopology0")
+      .setSourceTopics(Collections.singletonList(fooTopicName))
+
+    val subtopology1 = new StreamsGroupDescribeResponseData.Subtopology()
+      .setSubtopologyId("subtopology1")
+      .setRepartitionSinkTopics(Collections.singletonList(barTopicName))
+
+    val subtopology2 = new StreamsGroupDescribeResponseData.Subtopology()
+      .setSubtopologyId("subtopology2")
+      .setSourceTopics(Collections.singletonList(fooTopicName))
+      .setRepartitionSinkTopics(Collections.singletonList(barTopicName))
+
+    future.complete(List(
+      new StreamsGroupDescribeResponseData.DescribedGroup()
+        .setGroupId(groupIds.get(0))
+        .setTopology(new StreamsGroupDescribeResponseData.Topology()
+          .setSubtopologies(Collections.singletonList(subtopology0))),
+      new StreamsGroupDescribeResponseData.DescribedGroup()
+        .setGroupId(groupIds.get(1))
+        .setTopology(new StreamsGroupDescribeResponseData.Topology()
+          .setSubtopologies(Collections.singletonList(subtopology1))),
+      new StreamsGroupDescribeResponseData.DescribedGroup()
+        .setGroupId(groupIds.get(2))
+        .setTopology(new StreamsGroupDescribeResponseData.Topology()
+          .setSubtopologies(Collections.singletonList(subtopology2)))
+    ).asJava)
+
+    val response = 
verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
+    assertNotNull(response.data)
+    assertEquals(3, response.data.groups.size)
+    assertEquals(Errors.NONE.code(), response.data.groups.get(0).errorCode())
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), 
response.data.groups.get(1).errorCode())
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), 
response.data.groups.get(2).errorCode())
+    assertEquals(errorMessage, response.data.groups.get(1).errorMessage())
+    assertEquals(errorMessage, response.data.groups.get(2).errorMessage())
+  }
+
   @Test
   def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = {
     val fooTopicName = "foo"

Reply via email to