This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 36f19057e1d KAFKA-18813: ConsumerGroupHeartbeat API and 
ConsumerGroupDescribe API must check topic describe (#18989)
36f19057e1d is described below

commit 36f19057e1d57a8548a4548c304799fd176c359f
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Wed Feb 26 13:05:36 2025 -0500

    KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must 
check topic describe (#18989)
    
    This patch filters out the topic describe unauthorized topics from the
    ConsumerGroupHeartbeat and ConsumerGroupDescribe response.
    
    In ConsumerGroupHeartbeat,
    - if the request has `subscribedTopicNames` set, we directly check the
    authz in `KafkaApis` and return a topic auth failure in the response if
    any of the topics is denied.
    - Otherwise, we check the authz only if a regex refresh is triggered and
    we do it based on the acl of the consumer that triggered the refresh. If
    any of the topic is denied, we filter it out from the resolved
    subscription.
    
    In ConsumerGroupDescribe, we check the authz of the coordinator
    response. If any of the topic in the group is denied, we remove the
    described info and add a topic auth failure to the described group.
    (similar to the group auth failure)
    
    Reviewers: David Jacot <[email protected]>, Lianet Magrans
    <[email protected]>, Rajini Sivaram <[email protected]>,
    Chia-Ping Tsai <[email protected]>, TaiJuWu <[email protected]>,
    TengYao Chi <[email protected]>
---
 checkstyle/import-control-coordinator-common.xml   |   1 +
 checkstyle/import-control-group-coordinator.xml    |   3 +
 checkstyle/import-control-share-coordinator.xml    |   1 +
 .../message/ConsumerGroupDescribeResponse.json     |   1 +
 .../message/ConsumerGroupHeartbeatResponse.json    |   1 +
 .../src/main/scala/kafka/server/BrokerServer.scala |   1 +
 core/src/main/scala/kafka/server/KafkaApis.scala   |  48 ++++-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  91 ++++++--
 .../kafka/api/EndToEndAuthorizationTest.scala      |  15 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 200 +++++++++++++++++-
 docs/ops.html                                      |   2 +
 .../coordinator/group/GroupCoordinatorService.java |  11 +-
 .../coordinator/group/GroupCoordinatorShard.java   |  13 ++
 .../coordinator/group/GroupMetadataManager.java    | 114 ++++++++--
 .../group/GroupMetadataManagerTest.java            | 229 +++++++++++++++++++++
 .../group/GroupMetadataManagerTestContext.java     |   9 +
 .../jmh/coordinator/RegexResolutionBenchmark.java  |   3 +
 17 files changed, 700 insertions(+), 43 deletions(-)

diff --git a/checkstyle/import-control-coordinator-common.xml 
b/checkstyle/import-control-coordinator-common.xml
index c08955fd422..bafffe80697 100644
--- a/checkstyle/import-control-coordinator-common.xml
+++ b/checkstyle/import-control-coordinator-common.xml
@@ -58,6 +58,7 @@
                 <allow pkg="org.apache.kafka.coordinator.common" />
                 <allow pkg="org.apache.kafka.deferred" />
                 <allow pkg="org.apache.kafka.image" />
+                <allow pkg="org.apache.kafka.server.authorizer" />
                 <allow pkg="org.apache.kafka.server.common" />
                 <allow pkg="org.apache.kafka.server.metrics" />
                 <allow pkg="org.apache.kafka.server.util" />
diff --git a/checkstyle/import-control-group-coordinator.xml 
b/checkstyle/import-control-group-coordinator.xml
index c8a0f49d593..8b6a8d99f5e 100644
--- a/checkstyle/import-control-group-coordinator.xml
+++ b/checkstyle/import-control-group-coordinator.xml
@@ -40,6 +40,8 @@
 
     <!-- anyone can use public classes -->
     <allow pkg="org.apache.kafka.common" exact-match="true" />
+    <allow pkg="org.apache.kafka.common.acl" />
+    <allow pkg="org.apache.kafka.common.resource" />
     <allow pkg="org.apache.kafka.common.security" />
     <allow pkg="org.apache.kafka.common.serialization" />
     <allow pkg="org.apache.kafka.common.utils" />
@@ -63,6 +65,7 @@
             <allow pkg="org.apache.kafka.coordinator.group" />
             <allow pkg="org.apache.kafka.deferred" />
             <allow pkg="org.apache.kafka.image"/>
+            <allow pkg="org.apache.kafka.server.authorizer"/>
             <allow pkg="org.apache.kafka.server.common"/>
             <allow pkg="org.apache.kafka.server.record"/>
             <allow pkg="org.apache.kafka.server.share.persister"/>
diff --git a/checkstyle/import-control-share-coordinator.xml 
b/checkstyle/import-control-share-coordinator.xml
index aaea93d32e6..f430e283225 100644
--- a/checkstyle/import-control-share-coordinator.xml
+++ b/checkstyle/import-control-share-coordinator.xml
@@ -55,6 +55,7 @@
             <allow pkg="org.apache.kafka.coordinator.share.metrics" />
             <allow pkg="org.apache.kafka.image" />
             <allow pkg="org.apache.kafka.metadata" />
+            <allow pkg="org.apache.kafka.server.authorizer" />
             <allow pkg="org.apache.kafka.server.common" />
             <allow pkg="org.apache.kafka.server.config" />
             <allow pkg="org.apache.kafka.server.share" />
diff --git 
a/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json 
b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
index 14d80e20ce2..95588551922 100644
--- 
a/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
+++ 
b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
@@ -28,6 +28,7 @@
   // - INVALID_REQUEST (version 0+)
   // - INVALID_GROUP_ID (version 0+)
   // - GROUP_ID_NOT_FOUND (version 0+)
+  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
diff --git 
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json 
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
index 956cfab8262..010fc2cfe93 100644
--- 
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
+++ 
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
@@ -30,6 +30,7 @@
   // - UNSUPPORTED_ASSIGNOR (version 0+)
   // - UNRELEASED_INSTANCE_ID (version 0+)
   // - GROUP_MAX_SIZE_REACHED (version 0+)
+  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
   // - INVALID_REGULAR_EXPRESSION (version 1+)
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 5f5befb31dc..7ff50d175b5 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -645,6 +645,7 @@ class BrokerServer(
         .withGroupCoordinatorMetrics(new 
GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
         .withGroupConfigManager(groupConfigManager)
         .withPersister(persister)
+        .withAuthorizer(authorizer.toJava)
         .build()
     } else {
       GroupCoordinatorAdapter(
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index f3421a21bc6..1e3b35ff03c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -72,6 +72,7 @@ import java.time.Duration
 import java.util
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
+import java.util.stream.Collectors
 import java.util.{Collections, Optional}
 import scala.annotation.nowarn
 import scala.collection.mutable.ArrayBuffer
@@ -2531,9 +2532,24 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
+      if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null &&
+        !consumerGroupHeartbeatRequest.data.subscribedTopicNames.isEmpty) {
+        // Check the authorization if the subscribed topic names are provided.
+        // Clients are not allowed to see topics that are not authorized for 
Describe.
+        val subscribedTopicSet = 
consumerGroupHeartbeatRequest.data.subscribedTopicNames.asScala.toSet
+        val authorizedTopics = authHelper.filterByAuthorized(request.context, 
DESCRIBE, TOPIC,
+          subscribedTopicSet)(identity)
+        if (authorizedTopics.size < subscribedTopicSet.size) {
+          val responseData = new ConsumerGroupHeartbeatResponseData()
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+          requestHelper.sendMaybeThrottle(request, new 
ConsumerGroupHeartbeatResponse(responseData))
+          return CompletableFuture.completedFuture[Unit](())
+        }
+      }
+
       groupCoordinator.consumerGroupHeartbeat(
         request.context,
-        consumerGroupHeartbeatRequest.data,
+        consumerGroupHeartbeatRequest.data
       ).handle[Unit] { (response, exception) =>
         if (exception != null) {
           requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(exception))
@@ -2594,6 +2610,36 @@ class KafkaApis(val requestChannel: RequestChannel,
             response.groups.addAll(results)
           }
 
+          // Clients are not allowed to see topics that are not authorized for 
Describe.
+          if (!authorizer.isEmpty) {
+            val topicsToCheck = response.groups.stream()
+              .flatMap(group => group.members.stream)
+              .flatMap(member => util.stream.Stream.of(member.assignment, 
member.targetAssignment))
+              .flatMap(assignment => assignment.topicPartitions.stream)
+              .map(topicPartition => topicPartition.topicName)
+              .collect(Collectors.toSet[String])
+              .asScala
+            val authorizedTopics = 
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
+              topicsToCheck)(identity)
+            val updatedGroups = response.groups.stream().map { group =>
+              val hasUnauthorizedTopic = group.members.stream()
+                .flatMap(member => util.stream.Stream.of(member.assignment, 
member.targetAssignment))
+                .flatMap(assignment => assignment.topicPartitions.stream())
+                .anyMatch(tp => !authorizedTopics.contains(tp.topicName))
+
+              if (hasUnauthorizedTopic) {
+                new ConsumerGroupDescribeResponseData.DescribedGroup()
+                  .setGroupId(group.groupId)
+                  .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+                  .setErrorMessage("The group has described topic(s) that the 
client is not authorized to describe.")
+                  .setMembers(List.empty.asJava)
+              } else {
+                group
+              }
+            
}.collect(Collectors.toList[ConsumerGroupDescribeResponseData.DescribedGroup])
+            response.setGroups(updatedGroups)
+          }
+
           requestHelper.sendMaybeThrottle(request, new 
ConsumerGroupDescribeResponse(response))
         }
       }
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 3aec9be1f49..d0ee3315b22 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -601,7 +601,8 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   private def consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
     new ConsumerGroupHeartbeatRequestData()
       .setGroupId(group)
-      .setMemberEpoch(0)).build()
+      .setMemberEpoch(0)
+      .setSubscribedTopicNames(List(topic).asJava)).build()
 
   private def consumerGroupDescribeRequest = new 
ConsumerGroupDescribeRequest.Builder(
     new ConsumerGroupDescribeRequestData()
@@ -2492,11 +2493,12 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
 
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
-  def testConsumerGroupHeartbeatWithReadAcl(quorum: String): Unit = {
+  def testConsumerGroupHeartbeatWithGroupReadAndTopicDescribeAcl(quorum: 
String): Unit = {
     addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
 
     val request = consumerGroupHeartbeatRequest
-    val resource = Set[ResourceType](GROUP)
+    val resource = Set[ResourceType](GROUP, TOPIC)
     sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
   }
 
@@ -2505,50 +2507,115 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   def testConsumerGroupHeartbeatWithOperationAll(quorum: String): Unit = {
     val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
     addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
+    addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
 
     val request = consumerGroupHeartbeatRequest
-    val resource = Set[ResourceType](GROUP)
+    val resource = Set[ResourceType](GROUP, TOPIC)
     sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
   }
 
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
-  def testConsumerGroupHeartbeatWithoutReadAcl(quorum: String): Unit = {
+  def testConsumerGroupHeartbeatWithoutGroupReadOrTopicDescribeAcl(quorum: 
String): Unit = {
     removeAllClientAcls()
 
     val request = consumerGroupHeartbeatRequest
-    val resource = Set[ResourceType](GROUP)
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testConsumerGroupHeartbeatWithoutGroupReadAcl(quorum: String): Unit = {
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+    val request = consumerGroupHeartbeatRequest
+
+    val resource = Set[ResourceType](GROUP, TOPIC)
     sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
   }
 
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
-  def testConsumerGroupDescribeWithDescribeAcl(quorum: String): Unit = {
+  def testConsumerGroupHeartbeatWithoutTopicDescribeAcl(quorum: String): Unit 
= {
+    addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
+
+    val request = consumerGroupHeartbeatRequest
+
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  private def createConsumerGroupToDescribe(): Unit = {
+    createTopicWithBrokerPrincipal(topic)
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)), groupResource)
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)), topicResource)
+    consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer")
+    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, group)
+    val consumer = createConsumer()
+    consumer.subscribe(Collections.singleton(topic))
+    consumer.poll(Duration.ofMillis(500L))
+    removeAllClientAcls()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testConsumerGroupDescribeWithGroupDescribeAndTopicDescribeAcl(quorum: 
String): Unit = {
+    createConsumerGroupToDescribe()
+
     addAndVerifyAcls(groupDescribeAcl(groupResource), groupResource)
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
 
     val request = consumerGroupDescribeRequest
-    val resource = Set[ResourceType](GROUP)
+    val resource = Set[ResourceType](GROUP, TOPIC)
     sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
   }
 
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testConsumerGroupDescribeWithOperationAll(quorum: String): Unit = {
+    createConsumerGroupToDescribe()
+
     val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
     addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
+    addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
 
     val request = consumerGroupDescribeRequest
-    val resource = Set[ResourceType](GROUP)
+    val resource = Set[ResourceType](GROUP, TOPIC)
     sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
   }
 
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
-  def testConsumerGroupDescribeWithoutDescribeAcl(quorum: String): Unit = {
-    removeAllClientAcls()
+  def testConsumerGroupDescribeWithoutGroupDescribeAcl(quorum: String): Unit = 
{
+    createConsumerGroupToDescribe()
+
+    addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
 
     val request = consumerGroupDescribeRequest
-    val resource = Set[ResourceType](GROUP)
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testConsumerGroupDescribeWithoutTopicDescribeAcl(quorum: String): Unit = 
{
+    createConsumerGroupToDescribe()
+
+    addAndVerifyAcls(groupDescribeAcl(groupResource), groupResource)
+
+    val request = consumerGroupDescribeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
+    sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testConsumerGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl(quorum: 
String): Unit = {
+    createConsumerGroupToDescribe()
+
+    val request = consumerGroupDescribeRequest
+    val resource = Set[ResourceType](GROUP, TOPIC)
     sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index fe9336a04f4..aa6d208ef8b 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.ExecutionException
 import org.apache.kafka.metadata.authorizer.StandardAuthorizer
 import kafka.utils._
 import org.apache.kafka.clients.admin.Admin
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
ConsumerRecords}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
ConsumerRecords, GroupProtocol}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.acl.AclOperation._
@@ -430,7 +430,18 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
 
     // Verify that records are consumed if all topics are authorized
     consumer.subscribe(List(topic).asJava)
-    consumeRecordsIgnoreOneAuthorizationException(consumer)
+    if (groupProtocol.equals(GroupProtocol.CLASSIC)) {
+      consumeRecordsIgnoreOneAuthorizationException(consumer)
+    } else {
+      TestUtils.waitUntilTrue(() => {
+        try {
+          consumeRecords(consumer, numRecords, 0, topic)
+          true
+        } catch {
+          case _: TopicAuthorizationException => false
+        }
+      }, "Consumer didn't manage to consume the records within timeout.")
+    }
   }
 
   private def noConsumeWithoutDescribeAclSetup(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index c31a75ad8da..85cb53db00f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -39,7 +39,7 @@ import 
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartiti
 import 
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource 
=> LAlterConfigsResource, AlterConfigsResourceCollection => 
LAlterConfigsResourceCollection, AlterableConfig => LAlterableConfig, 
AlterableConfigCollection => LAlterableConfigCollection}
 import 
org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse
 => LAlterConfigsResourceResponse}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
-import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup
+import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{DescribedGroup,
 TopicPartitions}
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.{DescribeShareGroupOffsetsRequestGroup,
 DescribeShareGroupOffsetsRequestTopic}
@@ -9852,7 +9852,7 @@ class KafkaApisTest extends Logging {
   }
 
   @Test
-  def testConsumerGroupHeartbeatRequestAuthorizationFailed(): Unit = {
+  def testConsumerGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = {
     metadataCache = mock(classOf[KRaftMetadataCache])
 
     val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequestData().setGroupId("group")
@@ -9872,6 +9872,46 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testConsumerGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
+    val groupId = "group"
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
+    val zarTopicName = "zar"
+
+    val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData()
+      .setGroupId(groupId)
+      .setSubscribedTopicNames(List(fooTopicName, barTopicName, 
zarTopicName).asJava)
+
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val acls = Map(
+      groupId -> AuthorizationResult.ALLOWED,
+      fooTopicName -> AuthorizationResult.ALLOWED,
+      barTopicName -> AuthorizationResult.DENIED,
+    )
+    when(authorizer.authorize(
+      any[RequestContext],
+      any[util.List[Action]]
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument(1, classOf[util.List[Action]])
+      actions.asScala.map { action =>
+        acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+      }.asJava
+    }
+
+    kafkaApis = createKafkaApis(
+      authorizer = Some(authorizer),
+      featureVersions = Seq(GroupVersion.GV_1)
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val response = 
verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
+  }
+
   @Test
   def testStreamsGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
@@ -10043,6 +10083,8 @@ class KafkaApisTest extends Logging {
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
     metadataCache = mock(classOf[KRaftMetadataCache])
 
     val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
@@ -10061,10 +10103,44 @@ class KafkaApisTest extends Logging {
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
+    val member0 = new ConsumerGroupDescribeResponseData.Member()
+      .setMemberId("member0")
+      .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(
+          new TopicPartitions().setTopicName(fooTopicName)).asJava))
+      .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(
+          new TopicPartitions().setTopicName(fooTopicName)).asJava))
+
+    val member1 = new ConsumerGroupDescribeResponseData.Member()
+      .setMemberId("member1")
+      .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(
+          new TopicPartitions().setTopicName(fooTopicName)).asJava))
+      .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(
+          new TopicPartitions().setTopicName(fooTopicName),
+          new TopicPartitions().setTopicName(barTopicName)).asJava))
+
+    val member2 = new ConsumerGroupDescribeResponseData.Member()
+      .setMemberId("member2")
+      .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(
+          new TopicPartitions().setTopicName(barTopicName)).asJava))
+      .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(
+          new TopicPartitions().setTopicName(fooTopicName)).asJava))
+
     future.complete(List(
-      new DescribedGroup().setGroupId(groupIds.get(0)),
-      new DescribedGroup().setGroupId(groupIds.get(1)),
-      new DescribedGroup().setGroupId(groupIds.get(2))
+      new DescribedGroup()
+        .setGroupId(groupIds.get(0))
+        .setMembers(List(member0).asJava),
+      new DescribedGroup()
+        .setGroupId(groupIds.get(1))
+        .setMembers(List(member0, member1).asJava),
+      new DescribedGroup()
+        .setGroupId(groupIds.get(2))
+        .setMembers(List(member2).asJava)
     ).asJava)
 
     var authorizedOperationsInt = Int.MinValue
@@ -10076,9 +10152,15 @@ class KafkaApisTest extends Logging {
 
     // Can't reuse the above list here because we would not test the 
implementation in KafkaApis then
     val describedGroups = List(
-      new DescribedGroup().setGroupId(groupIds.get(0)),
-      new DescribedGroup().setGroupId(groupIds.get(1)),
-      new DescribedGroup().setGroupId(groupIds.get(2))
+      new DescribedGroup()
+        .setGroupId(groupIds.get(0))
+        .setMembers(List(member0).asJava),
+      new DescribedGroup()
+        .setGroupId(groupIds.get(1))
+        .setMembers(List(member0, member1).asJava),
+      new DescribedGroup()
+        .setGroupId(groupIds.get(2))
+        .setMembers(List(member2).asJava)
     ).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
     val expectedConsumerGroupDescribeResponseData = new 
ConsumerGroupDescribeResponseData()
       .setGroups(describedGroups.asJava)
@@ -10294,6 +10376,108 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.FENCED_MEMBER_EPOCH.code, 
response.data.groups.get(0).errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = {
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
+    val errorMessage = "The group has described topic(s) that the client is 
not authorized to describe."
+
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+      .setGroupIds(groupIds)
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val acls = Map(
+      groupIds.get(0) -> AuthorizationResult.ALLOWED,
+      groupIds.get(1) -> AuthorizationResult.ALLOWED,
+      groupIds.get(2) -> AuthorizationResult.ALLOWED,
+      fooTopicName    -> AuthorizationResult.ALLOWED,
+      barTopicName    -> AuthorizationResult.DENIED,
+    )
+    when(authorizer.authorize(
+      any[RequestContext],
+      any[util.List[Action]]
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument(1, classOf[util.List[Action]])
+      actions.asScala.map { action =>
+        acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+      }.asJava
+    }
+
+    val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.consumerGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(
+      authorizer = Some(authorizer),
+      featureVersions = Seq(GroupVersion.GV_1)
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val member0 = new ConsumerGroupDescribeResponseData.Member()
+      .setMemberId("member0")
+      .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(
+          new TopicPartitions().setTopicName(fooTopicName)).asJava))
+      .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(
+          new TopicPartitions().setTopicName(fooTopicName)).asJava))
+
+    val member1 = new ConsumerGroupDescribeResponseData.Member()
+      .setMemberId("member1")
+      .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(
+          new TopicPartitions().setTopicName(fooTopicName)).asJava))
+      .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(
+          new TopicPartitions().setTopicName(fooTopicName),
+          new TopicPartitions().setTopicName(barTopicName)).asJava))
+
+    val member2 = new ConsumerGroupDescribeResponseData.Member()
+      .setMemberId("member2")
+      .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(
+          new TopicPartitions().setTopicName(barTopicName)).asJava))
+      .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(
+          new TopicPartitions().setTopicName(fooTopicName)).asJava))
+
+    future.complete(List(
+      new DescribedGroup()
+        .setGroupId(groupIds.get(0))
+        .setMembers(List(member0).asJava),
+      new DescribedGroup()
+        .setGroupId(groupIds.get(1))
+        .setMembers(List(member0, member1).asJava),
+      new DescribedGroup()
+        .setGroupId(groupIds.get(2))
+        .setMembers(List(member2).asJava)
+    ).asJava)
+
+    val expectedConsumerGroupDescribeResponseData = new 
ConsumerGroupDescribeResponseData()
+      .setGroups(List(
+        new DescribedGroup()
+          .setGroupId(groupIds.get(0))
+          .setMembers(List(member0).asJava),
+        new DescribedGroup()
+          .setGroupId(groupIds.get(1))
+          .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+          .setErrorMessage(errorMessage),
+        new DescribedGroup()
+          .setGroupId(groupIds.get(2))
+          .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+          .setErrorMessage(errorMessage)
+      ).asJava)
+
+    val response = 
verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
+
+    assertEquals(expectedConsumerGroupDescribeResponseData, response.data)
+  }
+
   @Test
   def testGetTelemetrySubscriptions(): Unit = {
     val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(
diff --git a/docs/ops.html b/docs/ops.html
index 3177b49d487..be116d9f292 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -127,6 +127,8 @@ topic1          0          854144          855809          
1665            consu
 topic2          0          460537          803290          342753          
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
 topic3          2          243655          398812          155157          
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      
consumer4</code></pre>
 
+  Note that if the consumer group uses the consumer protocol, the admin client 
needs DESCRIBE access to all the topics used in the group (topics the members 
are subscribed to). In contrast, the classic protocol does not require all 
topics DESCRIBE authorization.
+
   There are a number of additional "describe" options that can be used to 
provide more detailed information about a consumer group:
   <ul>
     <li>--members: This option provides the list of all active members in the 
consumer group.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 0e7004ba40c..21b9cddfea6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -84,6 +84,7 @@ import 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.record.BrokerCompressionType;
 import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
 import org.apache.kafka.server.share.persister.DeleteShareGroupStateResult;
@@ -135,6 +136,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         private GroupCoordinatorMetrics groupCoordinatorMetrics;
         private GroupConfigManager groupConfigManager;
         private Persister persister;
+        private Optional<Authorizer> authorizer;
 
         public Builder(
             int nodeId,
@@ -184,6 +186,11 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             return this;
         }
 
+        public Builder withAuthorizer(Optional<Authorizer> authorizer) {
+            this.authorizer = authorizer;
+            return this;
+        }
+
         public GroupCoordinatorService build() {
             requireNonNull(config, new IllegalArgumentException("Config must 
be set."));
             requireNonNull(writer, new IllegalArgumentException("Writer must 
be set."));
@@ -194,12 +201,14 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             requireNonNull(groupCoordinatorMetrics, new 
IllegalArgumentException("GroupCoordinatorMetrics must be set."));
             requireNonNull(groupConfigManager, new 
IllegalArgumentException("GroupConfigManager must be set."));
             requireNonNull(persister, new IllegalArgumentException("Persister 
must be set."));
+            requireNonNull(authorizer, new 
IllegalArgumentException("Authorizer must be set."));
 
             String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
             LogContext logContext = new LogContext(String.format("[%s] ", 
logPrefix));
 
             CoordinatorShardBuilderSupplier<GroupCoordinatorShard, 
CoordinatorRecord> supplier = () ->
-                new GroupCoordinatorShard.Builder(config, groupConfigManager);
+                new GroupCoordinatorShard.Builder(config, groupConfigManager)
+                    .withAuthorizer(authorizer);
 
             CoordinatorEventProcessor processor = new 
MultiThreadedEventProcessor(
                 logContext,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 1cea887a000..bbe2f1431ee 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -115,6 +115,7 @@ import 
org.apache.kafka.coordinator.group.modern.share.ShareGroup;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -126,6 +127,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -153,6 +155,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         private CoordinatorExecutor<CoordinatorRecord> executor;
         private CoordinatorMetrics coordinatorMetrics;
         private TopicPartition topicPartition;
+        private Optional<Authorizer> authorizer;
 
         public Builder(
             GroupCoordinatorConfig config,
@@ -216,6 +219,13 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
             return this;
         }
 
+        public CoordinatorShardBuilder<GroupCoordinatorShard, 
CoordinatorRecord> withAuthorizer(
+            Optional<Authorizer> authorizer
+        ) {
+            this.authorizer = authorizer;
+            return this;
+        }
+
         @SuppressWarnings("NPathComplexity")
         @Override
         public GroupCoordinatorShard build() {
@@ -236,6 +246,8 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                 throw new IllegalArgumentException("TopicPartition must be 
set.");
             if (groupConfigManager == null)
                 throw new IllegalArgumentException("GroupConfigManager must be 
set.");
+            if (authorizer == null)
+                throw new IllegalArgumentException("Authorizer must be set.");
 
             GroupCoordinatorMetricsShard metricsShard = 
((GroupCoordinatorMetrics) coordinatorMetrics)
                 .newMetricsShard(snapshotRegistry, topicPartition);
@@ -249,6 +261,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                 .withConfig(config)
                 .withGroupConfigManager(groupConfigManager)
                 .withGroupCoordinatorMetricsShard(metricsShard)
+                .withAuthorizer(authorizer)
                 .build();
 
             OffsetMetadataManager offsetMetadataManager = new 
OffsetMetadataManager.Builder()
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index be07edf0501..e864def4f37 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -63,6 +63,7 @@ import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
+import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor;
@@ -144,6 +145,9 @@ import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.TopicImage;
 import org.apache.kafka.image.TopicsDelta;
 import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
 import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
 import org.apache.kafka.server.share.persister.PartitionFactory;
@@ -174,8 +178,10 @@ import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
 import static 
org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
 import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
 import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
@@ -184,6 +190,8 @@ import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.CON
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
 import static 
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
 import static org.apache.kafka.coordinator.group.Group.GroupType.CLASSIC;
 import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
 import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE;
@@ -261,6 +269,7 @@ public class GroupMetadataManager {
         private MetadataImage metadataImage = null;
         private ShareGroupPartitionAssignor shareGroupAssignor = null;
         private GroupCoordinatorMetricsShard metrics;
+        private Optional<Authorizer> authorizer = null;
 
         Builder withLogContext(LogContext logContext) {
             this.logContext = logContext;
@@ -312,11 +321,17 @@ public class GroupMetadataManager {
             return this;
         }
 
+        Builder withAuthorizer(Optional<Authorizer> authorizer) {
+            this.authorizer = authorizer;
+            return this;
+        }
+
         GroupMetadataManager build() {
             if (logContext == null) logContext = new LogContext();
             if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
             if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
             if (time == null) time = Time.SYSTEM;
+            if (authorizer == null) authorizer = Optional.empty();
 
             if (timer == null)
                 throw new IllegalArgumentException("Timer must be set.");
@@ -341,7 +356,8 @@ public class GroupMetadataManager {
                 metadataImage,
                 config,
                 groupConfigManager,
-                shareGroupAssignor
+                shareGroupAssignor,
+                authorizer
             );
         }
     }
@@ -447,6 +463,11 @@ public class GroupMetadataManager {
      */
     private final ShareGroupPartitionAssignor shareGroupAssignor;
 
+    /**
+     * The authorizer to validate the regex subscription topics.
+     */
+    private final Optional<Authorizer> authorizer;
+
     private GroupMetadataManager(
         SnapshotRegistry snapshotRegistry,
         LogContext logContext,
@@ -457,7 +478,8 @@ public class GroupMetadataManager {
         MetadataImage metadataImage,
         GroupCoordinatorConfig config,
         GroupConfigManager groupConfigManager,
-        ShareGroupPartitionAssignor shareGroupAssignor
+        ShareGroupPartitionAssignor shareGroupAssignor,
+        Optional<Authorizer> authorizer
     ) {
         this.logContext = logContext;
         this.log = logContext.logger(GroupMetadataManager.class);
@@ -478,6 +500,7 @@ public class GroupMetadataManager {
         this.groupConfigManager = groupConfigManager;
         this.shareGroupAssignor = shareGroupAssignor;
         this.streamsGroupSessionTimeoutMs = 45000;
+        this.authorizer = authorizer;
     }
 
     /**
@@ -1791,14 +1814,13 @@ public class GroupMetadataManager {
      *    is larger than the current target assignment epoch.
      * 3) The member's assignment is reconciled with the target assignment.
      *
+     * @param context               The request context.
      * @param groupId               The group id from the request.
      * @param memberId              The member id from the request.
      * @param memberEpoch           The member epoch from the request.
      * @param instanceId            The instance id from the request or null.
      * @param rackId                The rack id from the request or null.
      * @param rebalanceTimeoutMs    The rebalance timeout from the request or 
-1.
-     * @param clientId              The client id.
-     * @param clientHost            The client host.
      * @param subscribedTopicNames  The list of subscribed topic names from 
the request
      *                              or null.
      * @param subscribedTopicRegex  The regular expression based subscription 
from the request
@@ -1810,14 +1832,13 @@ public class GroupMetadataManager {
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> consumerGroupHeartbeat(
+        RequestContext context,
         String groupId,
         String memberId,
         int memberEpoch,
         String instanceId,
         String rackId,
         int rebalanceTimeoutMs,
-        String clientId,
-        String clientHost,
         List<String> subscribedTopicNames,
         String subscribedTopicRegex,
         String assignorName,
@@ -1868,8 +1889,8 @@ public class GroupMetadataManager {
             .maybeUpdateServerAssignorName(Optional.ofNullable(assignorName))
             
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
             
.maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex))
-            .setClientId(clientId)
-            .setClientHost(clientHost)
+            .setClientId(context.clientId())
+            .setClientHost(context.clientAddress.toString())
             .setClassicMemberMetadata(null)
             .build();
 
@@ -1885,6 +1906,7 @@ public class GroupMetadataManager {
         );
 
         bumpGroupEpoch |= maybeUpdateRegularExpressions(
+            context,
             group,
             member,
             updatedMember,
@@ -2472,6 +2494,7 @@ public class GroupMetadataManager {
      * group. We align the refreshment of the regular expression in order to 
have
      * them trigger only one rebalance per update.
      *
+     * @param context       The request context.
      * @param group         The consumer group.
      * @param member        The old member.
      * @param updatedMember The new member.
@@ -2479,6 +2502,7 @@ public class GroupMetadataManager {
      * @return Whether a rebalance must be triggered.
      */
     private boolean maybeUpdateRegularExpressions(
+        RequestContext context,
         ConsumerGroup group,
         ConsumerGroupMember member,
         ConsumerGroupMember updatedMember,
@@ -2564,8 +2588,8 @@ public class GroupMetadataManager {
             Set<String> regexes = 
Collections.unmodifiableSet(subscribedRegularExpressions.keySet());
             executor.schedule(
                 key,
-                () -> refreshRegularExpressions(groupId, log, time, 
metadataImage, regexes),
-                (result, exception) -> handleRegularExpressionsResult(groupId, 
result, exception)
+                () -> refreshRegularExpressions(context, groupId, log, time, 
metadataImage, authorizer, regexes),
+                (result, exception) -> handleRegularExpressionsResult(groupId, 
memberId, result, exception)
             );
         }
 
@@ -2577,20 +2601,24 @@ public class GroupMetadataManager {
      * as an asynchronous task in the executor. Hence, it should not access 
any state from
      * the manager.
      *
-     * @param groupId   The group id.
-     * @param log       The log instance.
-     * @param time      The time instance.
-     * @param image     The metadata image to use for listing the topics.
-     * @param regexes   The list of regular expressions that must be resolved.
+     * @param context       The request context.
+     * @param groupId       The group id.
+     * @param log           The log instance.
+     * @param time          The time instance.
+     * @param image         The metadata image to use for listing the topics.
+     * @param authorizer    The authorizer.
+     * @param regexes       The list of regular expressions that must be 
resolved.
      * @return The list of resolved regular expressions.
      *
      * public for benchmarks.
      */
     public static Map<String, ResolvedRegularExpression> 
refreshRegularExpressions(
+        RequestContext context,
         String groupId,
         Logger log,
         Time time,
         MetadataImage image,
+        Optional<Authorizer> authorizer,
         Set<String> regexes
     ) {
         long startTimeMs = time.milliseconds();
@@ -2619,6 +2647,12 @@ public class GroupMetadataManager {
             }
         }
 
+        filterTopicDescribeAuthorizedTopics(
+            context,
+            authorizer,
+            resolvedRegexes
+        );
+
         long version = image.provenance().lastContainedOffset();
         Map<String, ResolvedRegularExpression> result = new 
HashMap<>(resolvedRegexes.size());
         for (Map.Entry<String, Set<String>> resolvedRegex : 
resolvedRegexes.entrySet()) {
@@ -2635,15 +2669,58 @@ public class GroupMetadataManager {
         return result;
     }
 
+    /**
+     * This method filters the topics in the resolved regexes
+     * that the member is authorized to describe.
+     *
+     * @param context           The request context.
+     * @param authorizer        The authorizer.
+     * @param resolvedRegexes   The map of the regex pattern and its set of 
matched topics.
+     */
+    private static void filterTopicDescribeAuthorizedTopics(
+        RequestContext context,
+        Optional<Authorizer> authorizer,
+        Map<String, Set<String>> resolvedRegexes
+    ) {
+        if (authorizer.isEmpty()) return;
+
+        Map<String, Integer> topicNameCount = new HashMap<>();
+        resolvedRegexes.values().forEach(topicNames ->
+            topicNames.forEach(topicName ->
+                topicNameCount.compute(topicName, Utils::incValue)
+            )
+        );
+
+        List<Action> actions = topicNameCount.entrySet().stream().map(entry -> 
{
+            ResourcePattern resource = new ResourcePattern(TOPIC, 
entry.getKey(), LITERAL);
+            return new Action(DESCRIBE, resource, entry.getValue(), true, 
false);
+        }).collect(Collectors.toList());
+
+        List<AuthorizationResult> authorizationResults = 
authorizer.get().authorize(context, actions);
+        Set<String> deniedTopics = new HashSet<>();
+        IntStream.range(0, actions.size()).forEach(i -> {
+            if (authorizationResults.get(i) == AuthorizationResult.DENIED) {
+                String deniedTopic = actions.get(i).resourcePattern().name();
+                deniedTopics.add(deniedTopic);
+            }
+        });
+
+        resolvedRegexes.forEach((__, topicNames) -> 
topicNames.removeAll(deniedTopics));
+    }
+
+
     /**
      * Handle the result of the asynchronous tasks which resolves the regular 
expressions.
      *
+     * @param groupId                       The group id.
+     * @param memberId                      The member id.
      * @param resolvedRegularExpressions    The resolved regular expressions.
      * @param exception                     The exception if the resolution 
failed.
      * @return A CoordinatorResult containing the records to mutate the group 
state.
      */
     private CoordinatorResult<Void, CoordinatorRecord> 
handleRegularExpressionsResult(
         String groupId,
+        String memberId,
         Map<String, ResolvedRegularExpression> resolvedRegularExpressions,
         Throwable exception
     ) {
@@ -2654,8 +2731,8 @@ public class GroupMetadataManager {
         }
 
         if (log.isDebugEnabled()) {
-            log.debug("[GroupId {}] Received updated regular expressions: {}.",
-                groupId, resolvedRegularExpressions);
+            log.debug("[GroupId {}] Received updated regular expressions based 
on the context of member {}: {}.",
+                groupId, memberId, resolvedRegularExpressions);
         }
 
         List<CoordinatorRecord> records = new ArrayList<>();
@@ -3803,14 +3880,13 @@ public class GroupMetadataManager {
         } else {
             // Otherwise, it is a regular heartbeat.
             return consumerGroupHeartbeat(
+                context,
                 request.groupId(),
                 request.memberId(),
                 request.memberEpoch(),
                 request.instanceId(),
                 request.rackId(),
                 request.rebalanceTimeoutMs(),
-                context.clientId(),
-                context.clientAddress.toString(),
                 request.subscribedTopicNames(),
                 request.subscribedTopicRegex(),
                 request.serverAssignor(),
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 7d69b6e221c..8ec5ec2eb4e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -107,6 +107,9 @@ import 
org.apache.kafka.coordinator.group.streams.TasksTuple;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
 import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
 import org.apache.kafka.server.share.persister.PartitionFactory;
@@ -16712,6 +16715,232 @@ public class GroupMetadataManagerTest {
         );
     }
 
+    @Test
+    public void 
testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        Authorizer authorizer = mock(Authorizer.class);
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build(12345L))
+            .withAuthorizer(authorizer)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(List.of("foo"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("foo*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                    .build())
+                .withSubscriptionMetadata(Map.of(
+                    fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6)))
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                .withResolvedRegularExpression("foo*", new 
ResolvedRegularExpression(
+                    Set.of(fooTopicName), 0L, 0L))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
+        context.time.sleep(10001L);
+
+        Map<String, AuthorizationResult> acls = new HashMap<>();
+        acls.put(fooTopicName, AuthorizationResult.ALLOWED);
+        acls.put(barTopicName, AuthorizationResult.DENIED);
+        when(authorizer.authorize(any(), any())).thenAnswer(invocation -> {
+            List<Action> actions = invocation.getArgument(1, List.class);
+            return actions.stream()
+                .map(action -> 
acls.getOrDefault(action.resourcePattern().name(), AuthorizationResult.DENIED))
+                .collect(Collectors.toList());
+        });
+
+        // Member 2 heartbeats with a different regular expression.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result1 = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicRegex("foo*|bar*")
+                .setServerAssignor("range")
+                .setTopicPartitions(Collections.emptyList()),
+            ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
+        );
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(Collections.singletonList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(3, 4, 5))))),
+            result1.response()
+        );
+
+        ConsumerGroupMember expectedMember2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicRegex("foo*|bar*")
+            .setServerAssignorName("range")
+            .build();
+
+        assertRecordsEquals(
+            List.of(
+                
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember2),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*")
+            ),
+            result1.records()
+        );
+
+        // Execute pending tasks.
+        assertEquals(
+            List.of(
+                new MockCoordinatorExecutor.ExecutorResult<>(
+                    groupId + "-regex",
+                    new CoordinatorResult<>(List.of(
+                        // The resolution of the new regex is persisted.
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
+                            groupId,
+                            "foo*|bar*",
+                            new ResolvedRegularExpression(
+                                Set.of("foo"),
+                                12345L,
+                                context.time.milliseconds()
+                            )
+                        ),
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
+                    ))
+                )
+            ),
+            context.processTasks()
+        );
+
+        // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
+        context.time.sleep(10001L);
+
+        // Access to the bar topic is granted.
+        acls.put(barTopicName, AuthorizationResult.ALLOWED);
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+            memberId1, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2)
+            )),
+            memberId2, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5)
+            ))
+        )));
+
+        // Member 2 heartbeats again with a new regex.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicRegex("foo|bar*")
+                .setServerAssignor("range")
+                .setTopicPartitions(Collections.emptyList()),
+            ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
+        );
+
+        expectedMember2 = new ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicRegex("foo|bar*")
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5)))
+            .build();
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(3, 4, 5))))),
+            result2.response()
+        );
+
+        assertRecordsEquals(
+            List.of(
+                
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember2),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*|bar*"),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember2)
+            ),
+            result2.records()
+        );
+
+        // A regex refresh is triggered and the bar topic is included.
+        assertRecordsEquals(
+            List.of(
+                // The resolution of the new regex is persisted.
+                
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
+                    groupId,
+                    "foo|bar*",
+                    new ResolvedRegularExpression(
+                        Set.of("foo", "bar"),
+                        12345L,
+                        context.time.milliseconds()
+                    )
+                ),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(
+                    groupId,
+                    Map.of(
+                        fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6),
+                        barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3)
+                    )
+                ),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12)
+            ),
+            context.processTasks().get(0).result.records()
+        );
+    }
+
     @Test
     public void 
testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() {
         String groupId = "fooup";
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 1b12af3f7b9..4f245296d47 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -112,6 +112,7 @@ import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
 import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
 
@@ -123,6 +124,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -468,6 +470,7 @@ public class GroupMetadataManagerTestContext {
         private ShareGroupPartitionAssignor shareGroupAssignor = new 
MockPartitionAssignor("share");
         private final List<ShareGroupBuilder> shareGroupBuilders = new 
ArrayList<>();
         private final Map<String, Object> config = new HashMap<>();
+        private Optional<Authorizer> authorizer = Optional.empty();
 
         public Builder withConfig(String key, Object value) {
             config.put(key, value);
@@ -499,6 +502,11 @@ public class GroupMetadataManagerTestContext {
             return this;
         }
 
+        public Builder withAuthorizer(Authorizer authorizer) {
+            this.authorizer = Optional.of(authorizer);
+            return this;
+        }
+
         public GroupMetadataManagerTestContext build() {
             if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
             if (groupConfigManager == null) groupConfigManager = 
createConfigManager();
@@ -528,6 +536,7 @@ public class GroupMetadataManagerTestContext {
                     .withGroupCoordinatorMetricsShard(metrics)
                     .withShareGroupAssignor(shareGroupAssignor)
                     .withGroupConfigManager(groupConfigManager)
+                    .withAuthorizer(authorizer)
                     .build(),
                 groupConfigManager
             );
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
index 08db52e4e60..35ee42836c8 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
 
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -118,10 +119,12 @@ public class RegexResolutionBenchmark {
     @OutputTimeUnit(TimeUnit.MILLISECONDS)
     public void run() {
         GroupMetadataManager.refreshRegularExpressions(
+            null,
             GROUP_ID,
             LOG,
             TIME,
             image,
+            Optional.empty(),
             regexes
         );
     }

Reply via email to