This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 8e44ddccb5b KAFKA-18813: ConsumerGroupHeartbeat API and
ConsumerGroupDescribe API… (#19042)
8e44ddccb5b is described below
commit 8e44ddccb5b3335e3450cb46e6be58130f8851a1
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Wed Feb 26 16:53:46 2025 -0500
KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API…
(#19042)
… must check topic describe (#18989)
This patch filters out the topic describe unauthorized topics from the
ConsumerGroupHeartbeat and ConsumerGroupDescribe response.
In ConsumerGroupHeartbeat,
- if the request has `subscribedTopicNames` set, we directly check the
authz in `KafkaApis` and return a topic auth failure in the response if
any of the topics is denied.
- Otherwise, we check the authz only if a regex refresh is triggered and
we do it based on the acl of the consumer that triggered the refresh. If
any of the topic is denied, we filter it out from the resolved
subscription.
In ConsumerGroupDescribe, we check the authz of the coordinator
response. If any of the topic in the group is denied, we remove the
described info and add a topic auth failure to the described group.
(similar to the group auth failure)
(cherry picked from commit 36f19057e1d57a8548a4548c304799fd176c359f)
Reviewers: David Jacot <[email protected]>, Lianet Magrans
<[email protected]>, Rajini Sivaram <[email protected]>,
Chia-Ping Tsai <[email protected]>, TaiJuWu <[email protected]>,
TengYao Chi <[email protected]>
---
checkstyle/import-control-coordinator-common.xml | 1 +
checkstyle/import-control-group-coordinator.xml | 3 +
checkstyle/import-control-share-coordinator.xml | 1 +
.../message/ConsumerGroupDescribeResponse.json | 1 +
.../message/ConsumerGroupHeartbeatResponse.json | 1 +
.../src/main/scala/kafka/server/BrokerServer.scala | 1 +
core/src/main/scala/kafka/server/KafkaApis.scala | 48 ++++-
.../kafka/api/AuthorizerIntegrationTest.scala | 91 ++++++--
.../kafka/api/EndToEndAuthorizationTest.scala | 15 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 200 +++++++++++++++++-
docs/ops.html | 2 +
.../coordinator/group/GroupCoordinatorService.java | 41 ++--
.../coordinator/group/GroupCoordinatorShard.java | 13 ++
.../coordinator/group/GroupMetadataManager.java | 114 ++++++++--
.../group/GroupMetadataManagerTest.java | 230 +++++++++++++++++++++
.../group/GroupMetadataManagerTestContext.java | 9 +
.../jmh/coordinator/RegexResolutionBenchmark.java | 3 +
17 files changed, 715 insertions(+), 59 deletions(-)
diff --git a/checkstyle/import-control-coordinator-common.xml
b/checkstyle/import-control-coordinator-common.xml
index c08955fd422..bafffe80697 100644
--- a/checkstyle/import-control-coordinator-common.xml
+++ b/checkstyle/import-control-coordinator-common.xml
@@ -58,6 +58,7 @@
<allow pkg="org.apache.kafka.coordinator.common" />
<allow pkg="org.apache.kafka.deferred" />
<allow pkg="org.apache.kafka.image" />
+ <allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.util" />
diff --git a/checkstyle/import-control-group-coordinator.xml
b/checkstyle/import-control-group-coordinator.xml
index 19d9d561613..4d1f0ed53ca 100644
--- a/checkstyle/import-control-group-coordinator.xml
+++ b/checkstyle/import-control-group-coordinator.xml
@@ -40,6 +40,8 @@
<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
+ <allow pkg="org.apache.kafka.common.acl" />
+ <allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.utils" />
@@ -63,6 +65,7 @@
<allow pkg="org.apache.kafka.coordinator.group" />
<allow pkg="org.apache.kafka.deferred" />
<allow pkg="org.apache.kafka.image"/>
+ <allow pkg="org.apache.kafka.server.authorizer"/>
<allow pkg="org.apache.kafka.server.common"/>
<allow pkg="org.apache.kafka.server.record"/>
<allow pkg="org.apache.kafka.server.util"/>
diff --git a/checkstyle/import-control-share-coordinator.xml
b/checkstyle/import-control-share-coordinator.xml
index aaea93d32e6..f430e283225 100644
--- a/checkstyle/import-control-share-coordinator.xml
+++ b/checkstyle/import-control-share-coordinator.xml
@@ -55,6 +55,7 @@
<allow pkg="org.apache.kafka.coordinator.share.metrics" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.metadata" />
+ <allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.share" />
diff --git
a/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
index 14d80e20ce2..95588551922 100644
---
a/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
+++
b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
@@ -28,6 +28,7 @@
// - INVALID_REQUEST (version 0+)
// - INVALID_GROUP_ID (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
+ // - TOPIC_AUTHORIZATION_FAILED (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
diff --git
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
index 956cfab8262..010fc2cfe93 100644
---
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
+++
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
@@ -30,6 +30,7 @@
// - UNSUPPORTED_ASSIGNOR (version 0+)
// - UNRELEASED_INSTANCE_ID (version 0+)
// - GROUP_MAX_SIZE_REACHED (version 0+)
+ // - TOPIC_AUTHORIZATION_FAILED (version 0+)
// - INVALID_REGULAR_EXPRESSION (version 1+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 13721e728f6..6d2d1089088 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -647,6 +647,7 @@ class BrokerServer(
.withCoordinatorRuntimeMetrics(new
GroupCoordinatorRuntimeMetrics(metrics))
.withGroupCoordinatorMetrics(new
GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
.withGroupConfigManager(groupConfigManager)
+ .withAuthorizer(authorizer.toJava)
.build()
} else {
GroupCoordinatorAdapter(
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 3f13f18dbca..195a42c7192 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -73,6 +73,7 @@ import java.time.Duration
import java.util
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
+import java.util.stream.Collectors
import java.util.{Collections, Optional}
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
@@ -2549,9 +2550,24 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request,
consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
+ if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null &&
+ !consumerGroupHeartbeatRequest.data.subscribedTopicNames.isEmpty) {
+ // Check the authorization if the subscribed topic names are provided.
+ // Clients are not allowed to see topics that are not authorized for
Describe.
+ val subscribedTopicSet =
consumerGroupHeartbeatRequest.data.subscribedTopicNames.asScala.toSet
+ val authorizedTopics = authHelper.filterByAuthorized(request.context,
DESCRIBE, TOPIC,
+ subscribedTopicSet)(identity)
+ if (authorizedTopics.size < subscribedTopicSet.size) {
+ val responseData = new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ requestHelper.sendMaybeThrottle(request, new
ConsumerGroupHeartbeatResponse(responseData))
+ return CompletableFuture.completedFuture[Unit](())
+ }
+ }
+
groupCoordinator.consumerGroupHeartbeat(
request.context,
- consumerGroupHeartbeatRequest.data,
+ consumerGroupHeartbeatRequest.data
).handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request,
consumerGroupHeartbeatRequest.getErrorResponse(exception))
@@ -2612,6 +2628,36 @@ class KafkaApis(val requestChannel: RequestChannel,
response.groups.addAll(results)
}
+ // Clients are not allowed to see topics that are not authorized for
Describe.
+ if (!authorizer.isEmpty) {
+ val topicsToCheck = response.groups.stream()
+ .flatMap(group => group.members.stream)
+ .flatMap(member => util.stream.Stream.of(member.assignment,
member.targetAssignment))
+ .flatMap(assignment => assignment.topicPartitions.stream)
+ .map(topicPartition => topicPartition.topicName)
+ .collect(Collectors.toSet[String])
+ .asScala
+ val authorizedTopics =
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
+ topicsToCheck)(identity)
+ val updatedGroups = response.groups.stream().map { group =>
+ val hasUnauthorizedTopic = group.members.stream()
+ .flatMap(member => util.stream.Stream.of(member.assignment,
member.targetAssignment))
+ .flatMap(assignment => assignment.topicPartitions.stream())
+ .anyMatch(tp => !authorizedTopics.contains(tp.topicName))
+
+ if (hasUnauthorizedTopic) {
+ new ConsumerGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(group.groupId)
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ .setErrorMessage("The group has described topic(s) that the
client is not authorized to describe.")
+ .setMembers(List.empty.asJava)
+ } else {
+ group
+ }
+
}.collect(Collectors.toList[ConsumerGroupDescribeResponseData.DescribedGroup])
+ response.setGroups(updatedGroups)
+ }
+
requestHelper.sendMaybeThrottle(request, new
ConsumerGroupDescribeResponse(response))
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index f8e3e1fc54a..fc74344c863 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -601,7 +601,8 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
private def consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(group)
- .setMemberEpoch(0)).build()
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List(topic).asJava)).build()
private def consumerGroupDescribeRequest = new
ConsumerGroupDescribeRequest.Builder(
new ConsumerGroupDescribeRequestData()
@@ -2492,11 +2493,12 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
- def testConsumerGroupHeartbeatWithReadAcl(quorum: String): Unit = {
+ def testConsumerGroupHeartbeatWithGroupReadAndTopicDescribeAcl(quorum:
String): Unit = {
addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
+ addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = consumerGroupHeartbeatRequest
- val resource = Set[ResourceType](GROUP)
+ val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@@ -2505,50 +2507,115 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
def testConsumerGroupHeartbeatWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
+ addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
val request = consumerGroupHeartbeatRequest
- val resource = Set[ResourceType](GROUP)
+ val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
- def testConsumerGroupHeartbeatWithoutReadAcl(quorum: String): Unit = {
+ def testConsumerGroupHeartbeatWithoutGroupReadOrTopicDescribeAcl(quorum:
String): Unit = {
removeAllClientAcls()
val request = consumerGroupHeartbeatRequest
- val resource = Set[ResourceType](GROUP)
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testConsumerGroupHeartbeatWithoutGroupReadAcl(quorum: String): Unit = {
+ addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+ val request = consumerGroupHeartbeatRequest
+
+ val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
- def testConsumerGroupDescribeWithDescribeAcl(quorum: String): Unit = {
+ def testConsumerGroupHeartbeatWithoutTopicDescribeAcl(quorum: String): Unit
= {
+ addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
+
+ val request = consumerGroupHeartbeatRequest
+
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ private def createConsumerGroupToDescribe(): Unit = {
+ createTopicWithBrokerPrincipal(topic)
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), groupResource)
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), topicResource)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer")
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, group)
+ val consumer = createConsumer()
+ consumer.subscribe(Collections.singleton(topic))
+ consumer.poll(Duration.ofMillis(500L))
+ removeAllClientAcls()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testConsumerGroupDescribeWithGroupDescribeAndTopicDescribeAcl(quorum:
String): Unit = {
+ createConsumerGroupToDescribe()
+
addAndVerifyAcls(groupDescribeAcl(groupResource), groupResource)
+ addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = consumerGroupDescribeRequest
- val resource = Set[ResourceType](GROUP)
+ val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupDescribeWithOperationAll(quorum: String): Unit = {
+ createConsumerGroupToDescribe()
+
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
+ addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
val request = consumerGroupDescribeRequest
- val resource = Set[ResourceType](GROUP)
+ val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
- def testConsumerGroupDescribeWithoutDescribeAcl(quorum: String): Unit = {
- removeAllClientAcls()
+ def testConsumerGroupDescribeWithoutGroupDescribeAcl(quorum: String): Unit =
{
+ createConsumerGroupToDescribe()
+
+ addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = consumerGroupDescribeRequest
- val resource = Set[ResourceType](GROUP)
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testConsumerGroupDescribeWithoutTopicDescribeAcl(quorum: String): Unit =
{
+ createConsumerGroupToDescribe()
+
+ addAndVerifyAcls(groupDescribeAcl(groupResource), groupResource)
+
+ val request = consumerGroupDescribeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testConsumerGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl(quorum:
String): Unit = {
+ createConsumerGroupToDescribe()
+
+ val request = consumerGroupDescribeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
diff --git
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index fe9336a04f4..aa6d208ef8b 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.ExecutionException
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import kafka.utils._
import org.apache.kafka.clients.admin.Admin
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig,
ConsumerRecords}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig,
ConsumerRecords, GroupProtocol}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.acl.AclOperation._
@@ -430,7 +430,18 @@ abstract class EndToEndAuthorizationTest extends
IntegrationTestHarness with Sas
// Verify that records are consumed if all topics are authorized
consumer.subscribe(List(topic).asJava)
- consumeRecordsIgnoreOneAuthorizationException(consumer)
+ if (groupProtocol.equals(GroupProtocol.CLASSIC)) {
+ consumeRecordsIgnoreOneAuthorizationException(consumer)
+ } else {
+ TestUtils.waitUntilTrue(() => {
+ try {
+ consumeRecords(consumer, numRecords, 0, topic)
+ true
+ } catch {
+ case _: TopicAuthorizationException => false
+ }
+ }, "Consumer didn't manage to consume the records within timeout.")
+ }
}
private def noConsumeWithoutDescribeAclSetup(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 1e167fb91c9..20a030714f7 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -40,7 +40,7 @@ import
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartiti
import
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource
=> LAlterConfigsResource, AlterConfigsResourceCollection =>
LAlterConfigsResourceCollection, AlterableConfig => LAlterableConfig,
AlterableConfigCollection => LAlterableConfigCollection}
import
org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse
=> LAlterConfigsResourceResponse}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
-import
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup
+import
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{DescribedGroup,
TopicPartitions}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource
=> IAlterConfigsResource, AlterConfigsResourceCollection =>
IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig,
AlterableConfigCollection => IAlterableConfigCollection}
@@ -9820,7 +9820,7 @@ class KafkaApisTest extends Logging {
}
@Test
- def testConsumerGroupHeartbeatRequestAuthorizationFailed(): Unit = {
+ def testConsumerGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequestData().setGroupId("group")
@@ -9840,9 +9840,51 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code,
response.data.errorCode)
}
+ @Test
+ def testConsumerGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
+ metadataCache = mock(classOf[KRaftMetadataCache])
+ val groupId = "group"
+ val fooTopicName = "foo"
+ val barTopicName = "bar"
+ val zarTopicName = "zar"
+
+ val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setSubscribedTopicNames(List(fooTopicName, barTopicName,
zarTopicName).asJava)
+
+ val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ val acls = Map(
+ groupId -> AuthorizationResult.ALLOWED,
+ fooTopicName -> AuthorizationResult.ALLOWED,
+ barTopicName -> AuthorizationResult.DENIED,
+ )
+ when(authorizer.authorize(
+ any[RequestContext],
+ any[util.List[Action]]
+ )).thenAnswer { invocation =>
+ val actions = invocation.getArgument(1, classOf[util.List[Action]])
+ actions.asScala.map { action =>
+ acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+ }.asJava
+ }
+
+ kafkaApis = createKafkaApis(
+ authorizer = Some(authorizer),
+ featureVersions = Seq(GroupVersion.GV_1)
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response =
verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code,
response.data.errorCode)
+ }
+
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
+ val fooTopicName = "foo"
+ val barTopicName = "bar"
metadataCache = mock(classOf[KRaftMetadataCache])
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
@@ -9861,10 +9903,44 @@ class KafkaApisTest extends Logging {
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+ val member0 = new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId("member0")
+ .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions().setTopicName(fooTopicName)).asJava))
+ .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions().setTopicName(fooTopicName)).asJava))
+
+ val member1 = new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId("member1")
+ .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions().setTopicName(fooTopicName)).asJava))
+ .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions().setTopicName(fooTopicName),
+ new TopicPartitions().setTopicName(barTopicName)).asJava))
+
+ val member2 = new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId("member2")
+ .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions().setTopicName(barTopicName)).asJava))
+ .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions().setTopicName(fooTopicName)).asJava))
+
future.complete(List(
- new DescribedGroup().setGroupId(groupIds.get(0)),
- new DescribedGroup().setGroupId(groupIds.get(1)),
- new DescribedGroup().setGroupId(groupIds.get(2))
+ new DescribedGroup()
+ .setGroupId(groupIds.get(0))
+ .setMembers(List(member0).asJava),
+ new DescribedGroup()
+ .setGroupId(groupIds.get(1))
+ .setMembers(List(member0, member1).asJava),
+ new DescribedGroup()
+ .setGroupId(groupIds.get(2))
+ .setMembers(List(member2).asJava)
).asJava)
var authorizedOperationsInt = Int.MinValue
@@ -9876,9 +9952,15 @@ class KafkaApisTest extends Logging {
// Can't reuse the above list here because we would not test the
implementation in KafkaApis then
val describedGroups = List(
- new DescribedGroup().setGroupId(groupIds.get(0)),
- new DescribedGroup().setGroupId(groupIds.get(1)),
- new DescribedGroup().setGroupId(groupIds.get(2))
+ new DescribedGroup()
+ .setGroupId(groupIds.get(0))
+ .setMembers(List(member0).asJava),
+ new DescribedGroup()
+ .setGroupId(groupIds.get(1))
+ .setMembers(List(member0, member1).asJava),
+ new DescribedGroup()
+ .setGroupId(groupIds.get(2))
+ .setMembers(List(member2).asJava)
).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
val expectedConsumerGroupDescribeResponseData = new
ConsumerGroupDescribeResponseData()
.setGroups(describedGroups.asJava)
@@ -9967,6 +10049,108 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.FENCED_MEMBER_EPOCH.code,
response.data.groups.get(0).errorCode)
}
+ @Test
+ def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = {
+ val fooTopicName = "foo"
+ val barTopicName = "bar"
+ val errorMessage = "The group has described topic(s) that the client is
not authorized to describe."
+
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
+ val consumerGroupDescribeRequestData = new
ConsumerGroupDescribeRequestData()
+ .setGroupIds(groupIds)
+ val requestChannelRequest = buildRequest(new
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData,
true).build())
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ val acls = Map(
+ groupIds.get(0) -> AuthorizationResult.ALLOWED,
+ groupIds.get(1) -> AuthorizationResult.ALLOWED,
+ groupIds.get(2) -> AuthorizationResult.ALLOWED,
+ fooTopicName -> AuthorizationResult.ALLOWED,
+ barTopicName -> AuthorizationResult.DENIED,
+ )
+ when(authorizer.authorize(
+ any[RequestContext],
+ any[util.List[Action]]
+ )).thenAnswer { invocation =>
+ val actions = invocation.getArgument(1, classOf[util.List[Action]])
+ actions.asScala.map { action =>
+ acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+ }.asJava
+ }
+
+ val future = new
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+ when(groupCoordinator.consumerGroupDescribe(
+ any[RequestContext],
+ any[util.List[String]]
+ )).thenReturn(future)
+ kafkaApis = createKafkaApis(
+ authorizer = Some(authorizer),
+ featureVersions = Seq(GroupVersion.GV_1)
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val member0 = new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId("member0")
+ .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions().setTopicName(fooTopicName)).asJava))
+ .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions().setTopicName(fooTopicName)).asJava))
+
+ val member1 = new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId("member1")
+ .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions().setTopicName(fooTopicName)).asJava))
+ .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions().setTopicName(fooTopicName),
+ new TopicPartitions().setTopicName(barTopicName)).asJava))
+
+ val member2 = new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId("member2")
+ .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions().setTopicName(barTopicName)).asJava))
+ .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions().setTopicName(fooTopicName)).asJava))
+
+ future.complete(List(
+ new DescribedGroup()
+ .setGroupId(groupIds.get(0))
+ .setMembers(List(member0).asJava),
+ new DescribedGroup()
+ .setGroupId(groupIds.get(1))
+ .setMembers(List(member0, member1).asJava),
+ new DescribedGroup()
+ .setGroupId(groupIds.get(2))
+ .setMembers(List(member2).asJava)
+ ).asJava)
+
+ val expectedConsumerGroupDescribeResponseData = new
ConsumerGroupDescribeResponseData()
+ .setGroups(List(
+ new DescribedGroup()
+ .setGroupId(groupIds.get(0))
+ .setMembers(List(member0).asJava),
+ new DescribedGroup()
+ .setGroupId(groupIds.get(1))
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ .setErrorMessage(errorMessage),
+ new DescribedGroup()
+ .setGroupId(groupIds.get(2))
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ .setErrorMessage(errorMessage)
+ ).asJava)
+
+ val response =
verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
+
+ assertEquals(expectedConsumerGroupDescribeResponseData, response.data)
+ }
+
@Test
def testGetTelemetrySubscriptions(): Unit = {
val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(
diff --git a/docs/ops.html b/docs/ops.html
index 48b4124374c..8dbcc7acab1 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -127,6 +127,8 @@ topic1 0 854144 855809
1665 consu
topic2 0 460537 803290 342753
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic3 2 243655 398812 155157
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1
consumer4</code></pre>
+ Note that if the consumer group uses the consumer protocol, the admin client
needs DESCRIBE access to all the topics used in the group (topics the members
are subscribed to). In contrast, the classic protocol does not require all
topics DESCRIBE authorization.
+
There are a number of additional "describe" options that can be used to
provide more detailed information about a consumer group:
<ul>
<li>--members: This option provides the list of all active members in the
consumer group.
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 8b8c4bb0f99..de830076bfb 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -74,6 +74,7 @@ import
org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
@@ -114,6 +115,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;
private GroupCoordinatorMetrics groupCoordinatorMetrics;
private GroupConfigManager groupConfigManager;
+ private Optional<Authorizer> authorizer;
public Builder(
int nodeId,
@@ -158,29 +160,28 @@ public class GroupCoordinatorService implements
GroupCoordinator {
return this;
}
+ public Builder withAuthorizer(Optional<Authorizer> authorizer) {
+ this.authorizer = authorizer;
+ return this;
+ }
+
public GroupCoordinatorService build() {
- if (config == null)
- throw new IllegalArgumentException("Config must be set.");
- if (writer == null)
- throw new IllegalArgumentException("Writer must be set.");
- if (loader == null)
- throw new IllegalArgumentException("Loader must be set.");
- if (time == null)
- throw new IllegalArgumentException("Time must be set.");
- if (timer == null)
- throw new IllegalArgumentException("Timer must be set.");
- if (coordinatorRuntimeMetrics == null)
- throw new IllegalArgumentException("CoordinatorRuntimeMetrics
must be set.");
- if (groupCoordinatorMetrics == null)
- throw new IllegalArgumentException("GroupCoordinatorMetrics
must be set.");
- if (groupConfigManager == null)
- throw new IllegalArgumentException("GroupConfigManager must be
set.");
+ requireNonNull(config, new IllegalArgumentException("Config must
be set."));
+ requireNonNull(writer, new IllegalArgumentException("Writer must
be set."));
+ requireNonNull(loader, new IllegalArgumentException("Loader must
be set."));
+ requireNonNull(time, new IllegalArgumentException("Time must be
set."));
+ requireNonNull(timer, new IllegalArgumentException("Timer must be
set."));
+ requireNonNull(coordinatorRuntimeMetrics, new
IllegalArgumentException("CoordinatorRuntimeMetrics must be set."));
+ requireNonNull(groupCoordinatorMetrics, new
IllegalArgumentException("GroupCoordinatorMetrics must be set."));
+ requireNonNull(groupConfigManager, new
IllegalArgumentException("GroupConfigManager must be set."));
+ requireNonNull(authorizer, new
IllegalArgumentException("Authorizer must be set."));
String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ",
logPrefix));
CoordinatorShardBuilderSupplier<GroupCoordinatorShard,
CoordinatorRecord> supplier = () ->
- new GroupCoordinatorShard.Builder(config, groupConfigManager);
+ new GroupCoordinatorShard.Builder(config, groupConfigManager)
+ .withAuthorizer(authorizer);
CoordinatorEventProcessor processor = new
MultiThreadedEventProcessor(
logContext,
@@ -1277,4 +1278,10 @@ public class GroupCoordinatorService implements
GroupCoordinator {
);
}
}
+
+ private static void requireNonNull(Object obj, RuntimeException throwable)
{
+ if (obj == null) {
+ throw throwable;
+ }
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 12c5ef4b4b7..b8e7ce26211 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -90,6 +90,7 @@ import
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -98,6 +99,7 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -125,6 +127,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
private CoordinatorExecutor<CoordinatorRecord> executor;
private CoordinatorMetrics coordinatorMetrics;
private TopicPartition topicPartition;
+ private Optional<Authorizer> authorizer;
public Builder(
GroupCoordinatorConfig config,
@@ -188,6 +191,13 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
return this;
}
+ public CoordinatorShardBuilder<GroupCoordinatorShard,
CoordinatorRecord> withAuthorizer(
+ Optional<Authorizer> authorizer
+ ) {
+ this.authorizer = authorizer;
+ return this;
+ }
+
@SuppressWarnings("NPathComplexity")
@Override
public GroupCoordinatorShard build() {
@@ -208,6 +218,8 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
throw new IllegalArgumentException("TopicPartition must be
set.");
if (groupConfigManager == null)
throw new IllegalArgumentException("GroupConfigManager must be
set.");
+ if (authorizer == null)
+ throw new IllegalArgumentException("Authorizer must be set.");
GroupCoordinatorMetricsShard metricsShard =
((GroupCoordinatorMetrics) coordinatorMetrics)
.newMetricsShard(snapshotRegistry, topicPartition);
@@ -221,6 +233,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
.withConfig(config)
.withGroupConfigManager(groupConfigManager)
.withGroupCoordinatorMetricsShard(metricsShard)
+ .withAuthorizer(authorizer)
.build();
OffsetMetadataManager offsetMetadataManager = new
OffsetMetadataManager.Builder()
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 6d0030e0d74..8c3f2dd4447 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -61,6 +61,7 @@ import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
+import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor;
@@ -122,6 +123,9 @@ import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
@@ -147,8 +151,10 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static
org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
@@ -157,6 +163,8 @@ import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.CON
import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
import static
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
import static org.apache.kafka.coordinator.group.Group.GroupType.CLASSIC;
import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE;
@@ -228,6 +236,7 @@ public class GroupMetadataManager {
private MetadataImage metadataImage = null;
private ShareGroupPartitionAssignor shareGroupAssignor = null;
private GroupCoordinatorMetricsShard metrics;
+ private Optional<Authorizer> authorizer = null;
Builder withLogContext(LogContext logContext) {
this.logContext = logContext;
@@ -279,11 +288,17 @@ public class GroupMetadataManager {
return this;
}
+ Builder withAuthorizer(Optional<Authorizer> authorizer) {
+ this.authorizer = authorizer;
+ return this;
+ }
+
GroupMetadataManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new
SnapshotRegistry(logContext);
if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
if (time == null) time = Time.SYSTEM;
+ if (authorizer == null) authorizer = Optional.empty();
if (timer == null)
throw new IllegalArgumentException("Timer must be set.");
@@ -308,7 +323,8 @@ public class GroupMetadataManager {
metadataImage,
config,
groupConfigManager,
- shareGroupAssignor
+ shareGroupAssignor,
+ authorizer
);
}
}
@@ -409,6 +425,11 @@ public class GroupMetadataManager {
*/
private final ShareGroupPartitionAssignor shareGroupAssignor;
+ /**
+ * The authorizer to validate the regex subscription topics.
+ */
+ private final Optional<Authorizer> authorizer;
+
private GroupMetadataManager(
SnapshotRegistry snapshotRegistry,
LogContext logContext,
@@ -419,7 +440,8 @@ public class GroupMetadataManager {
MetadataImage metadataImage,
GroupCoordinatorConfig config,
GroupConfigManager groupConfigManager,
- ShareGroupPartitionAssignor shareGroupAssignor
+ ShareGroupPartitionAssignor shareGroupAssignor,
+ Optional<Authorizer> authorizer
) {
this.logContext = logContext;
this.log = logContext.logger(GroupMetadataManager.class);
@@ -439,6 +461,7 @@ public class GroupMetadataManager {
this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
this.groupConfigManager = groupConfigManager;
this.shareGroupAssignor = shareGroupAssignor;
+ this.authorizer = authorizer;
}
/**
@@ -1618,14 +1641,13 @@ public class GroupMetadataManager {
* is larger than the current target assignment epoch.
* 3) The member's assignment is reconciled with the target assignment.
*
+ * @param context The request context.
* @param groupId The group id from the request.
* @param memberId The member id from the request.
* @param memberEpoch The member epoch from the request.
* @param instanceId The instance id from the request or null.
* @param rackId The rack id from the request or null.
* @param rebalanceTimeoutMs The rebalance timeout from the request or
-1.
- * @param clientId The client id.
- * @param clientHost The client host.
* @param subscribedTopicNames The list of subscribed topic names from
the request
* or null.
* @param subscribedTopicRegex The regular expression based subscription
from the request
@@ -1637,14 +1659,13 @@ public class GroupMetadataManager {
* a list of records to update the state machine.
*/
private CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> consumerGroupHeartbeat(
+ RequestContext context,
String groupId,
String memberId,
int memberEpoch,
String instanceId,
String rackId,
int rebalanceTimeoutMs,
- String clientId,
- String clientHost,
List<String> subscribedTopicNames,
String subscribedTopicRegex,
String assignorName,
@@ -1695,8 +1716,8 @@ public class GroupMetadataManager {
.maybeUpdateServerAssignorName(Optional.ofNullable(assignorName))
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
.maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex))
- .setClientId(clientId)
- .setClientHost(clientHost)
+ .setClientId(context.clientId())
+ .setClientHost(context.clientAddress.toString())
.setClassicMemberMetadata(null)
.build();
@@ -1712,6 +1733,7 @@ public class GroupMetadataManager {
);
bumpGroupEpoch |= maybeUpdateRegularExpressions(
+ context,
group,
member,
updatedMember,
@@ -2297,6 +2319,7 @@ public class GroupMetadataManager {
* group. We align the refreshment of the regular expression in order to
have
* them trigger only one rebalance per update.
*
+ * @param context The request context.
* @param group The consumer group.
* @param member The old member.
* @param updatedMember The new member.
@@ -2304,6 +2327,7 @@ public class GroupMetadataManager {
* @return Whether a rebalance must be triggered.
*/
private boolean maybeUpdateRegularExpressions(
+ RequestContext context,
ConsumerGroup group,
ConsumerGroupMember member,
ConsumerGroupMember updatedMember,
@@ -2389,8 +2413,8 @@ public class GroupMetadataManager {
Set<String> regexes =
Collections.unmodifiableSet(subscribedRegularExpressions.keySet());
executor.schedule(
key,
- () -> refreshRegularExpressions(groupId, log, time,
metadataImage, regexes),
- (result, exception) -> handleRegularExpressionsResult(groupId,
result, exception)
+ () -> refreshRegularExpressions(context, groupId, log, time,
metadataImage, authorizer, regexes),
+ (result, exception) -> handleRegularExpressionsResult(groupId,
memberId, result, exception)
);
}
@@ -2402,20 +2426,24 @@ public class GroupMetadataManager {
* as an asynchronous task in the executor. Hence, it should not access
any state from
* the manager.
*
- * @param groupId The group id.
- * @param log The log instance.
- * @param time The time instance.
- * @param image The metadata image to use for listing the topics.
- * @param regexes The list of regular expressions that must be resolved.
+ * @param context The request context.
+ * @param groupId The group id.
+ * @param log The log instance.
+ * @param time The time instance.
+ * @param image The metadata image to use for listing the topics.
+ * @param authorizer The authorizer.
+ * @param regexes The list of regular expressions that must be
resolved.
* @return The list of resolved regular expressions.
*
* public for benchmarks.
*/
public static Map<String, ResolvedRegularExpression>
refreshRegularExpressions(
+ RequestContext context,
String groupId,
Logger log,
Time time,
MetadataImage image,
+ Optional<Authorizer> authorizer,
Set<String> regexes
) {
long startTimeMs = time.milliseconds();
@@ -2444,6 +2472,12 @@ public class GroupMetadataManager {
}
}
+ filterTopicDescribeAuthorizedTopics(
+ context,
+ authorizer,
+ resolvedRegexes
+ );
+
long version = image.provenance().lastContainedOffset();
Map<String, ResolvedRegularExpression> result = new
HashMap<>(resolvedRegexes.size());
for (Map.Entry<String, Set<String>> resolvedRegex :
resolvedRegexes.entrySet()) {
@@ -2460,15 +2494,58 @@ public class GroupMetadataManager {
return result;
}
+ /**
+ * This method filters the topics in the resolved regexes
+ * that the member is authorized to describe.
+ *
+ * @param context The request context.
+ * @param authorizer The authorizer.
+ * @param resolvedRegexes The map of the regex pattern and its set of
matched topics.
+ */
+ private static void filterTopicDescribeAuthorizedTopics(
+ RequestContext context,
+ Optional<Authorizer> authorizer,
+ Map<String, Set<String>> resolvedRegexes
+ ) {
+ if (authorizer.isEmpty()) return;
+
+ Map<String, Integer> topicNameCount = new HashMap<>();
+ resolvedRegexes.values().forEach(topicNames ->
+ topicNames.forEach(topicName ->
+ topicNameCount.compute(topicName, Utils::incValue)
+ )
+ );
+
+ List<Action> actions = topicNameCount.entrySet().stream().map(entry ->
{
+ ResourcePattern resource = new ResourcePattern(TOPIC,
entry.getKey(), LITERAL);
+ return new Action(DESCRIBE, resource, entry.getValue(), true,
false);
+ }).collect(Collectors.toList());
+
+ List<AuthorizationResult> authorizationResults =
authorizer.get().authorize(context, actions);
+ Set<String> deniedTopics = new HashSet<>();
+ IntStream.range(0, actions.size()).forEach(i -> {
+ if (authorizationResults.get(i) == AuthorizationResult.DENIED) {
+ String deniedTopic = actions.get(i).resourcePattern().name();
+ deniedTopics.add(deniedTopic);
+ }
+ });
+
+ resolvedRegexes.forEach((__, topicNames) ->
topicNames.removeAll(deniedTopics));
+ }
+
+
/**
* Handle the result of the asynchronous tasks which resolves the regular
expressions.
*
+ * @param groupId The group id.
+ * @param memberId The member id.
* @param resolvedRegularExpressions The resolved regular expressions.
* @param exception The exception if the resolution
failed.
* @return A CoordinatorResult containing the records to mutate the group
state.
*/
private CoordinatorResult<Void, CoordinatorRecord>
handleRegularExpressionsResult(
String groupId,
+ String memberId,
Map<String, ResolvedRegularExpression> resolvedRegularExpressions,
Throwable exception
) {
@@ -2479,8 +2556,8 @@ public class GroupMetadataManager {
}
if (log.isDebugEnabled()) {
- log.debug("[GroupId {}] Received updated regular expressions: {}.",
- groupId, resolvedRegularExpressions);
+ log.debug("[GroupId {}] Received updated regular expressions based
on the context of member {}: {}.",
+ groupId, memberId, resolvedRegularExpressions);
}
List<CoordinatorRecord> records = new ArrayList<>();
@@ -3492,14 +3569,13 @@ public class GroupMetadataManager {
} else {
// Otherwise, it is a regular heartbeat.
return consumerGroupHeartbeat(
+ context,
request.groupId(),
request.memberId(),
request.memberEpoch(),
request.instanceId(),
request.rackId(),
request.rebalanceTimeoutMs(),
- context.clientId(),
- context.clientAddress.toString(),
request.subscribedTopicNames(),
request.subscribedTopicRegex(),
request.serverAssignor(),
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index edca53eedf9..f40f51f7739 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -94,6 +94,9 @@ import
org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -104,6 +107,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -16030,6 +16034,232 @@ public class GroupMetadataManagerTest {
);
}
+ @Test
+ public void
testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ Uuid barTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ Authorizer authorizer = mock(Authorizer.class);
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build(12345L))
+ .withAuthorizer(authorizer)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .build())
+ .withSubscriptionMetadata(Map.of(
+ fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6)))
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .withResolvedRegularExpression("foo*", new
ResolvedRegularExpression(
+ Set.of(fooTopicName), 0L, 0L))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
+ context.time.sleep(10001L);
+
+ Map<String, AuthorizationResult> acls = new HashMap<>();
+ acls.put(fooTopicName, AuthorizationResult.ALLOWED);
+ acls.put(barTopicName, AuthorizationResult.DENIED);
+ when(authorizer.authorize(any(), any())).thenAnswer(invocation -> {
+ List<Action> actions = invocation.getArgument(1, List.class);
+ return actions.stream()
+ .map(action ->
acls.getOrDefault(action.resourcePattern().name(), AuthorizationResult.DENIED))
+ .collect(Collectors.toList());
+ });
+
+ // Member 2 heartbeats with a different regular expression.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(10)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*|bar*")
+ .setServerAssignor("range")
+ .setTopicPartitions(Collections.emptyList()),
+ ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
+ );
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(10)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(Collections.singletonList(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(3, 4, 5))))),
+ result1.response()
+ );
+
+ ConsumerGroupMember expectedMember2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*|bar*")
+ .setServerAssignorName("range")
+ .build();
+
+ assertRecordsEquals(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*")
+ ),
+ result1.records()
+ );
+
+ // Execute pending tasks.
+ assertEquals(
+ List.of(
+ new MockCoordinatorExecutor.ExecutorResult<>(
+ groupId + "-regex",
+ new CoordinatorResult<>(List.of(
+ // The resolution of the new regex is persisted.
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
+ groupId,
+ "foo*|bar*",
+ new ResolvedRegularExpression(
+ Set.of("foo"),
+ 12345L,
+ context.time.milliseconds()
+ )
+ ),
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
+ ))
+ )
+ ),
+ context.processTasks()
+ );
+
+ // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
+ context.time.sleep(10001L);
+
+ // Access to the bar topic is granted.
+ acls.put(barTopicName, AuthorizationResult.ALLOWED);
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+ memberId1, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )),
+ memberId2, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)
+ ))
+ )));
+
+ // Member 2 heartbeats again with a new regex.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(10)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo|bar*")
+ .setServerAssignor("range")
+ .setTopicPartitions(Collections.emptyList()),
+ ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
+ );
+
+ expectedMember2 = new ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo|bar*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .build();
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(3, 4, 5))))),
+ result2.response()
+ );
+
+ assertRecordsEquals(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*|bar*"),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2)
+ ),
+ result2.records()
+ );
+
+ // A regex refresh is triggered and the bar topic is included.
+ assertRecordsEquals(
+ List.of(
+ // The resolution of the new regex is persisted.
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
+ groupId,
+ "foo|bar*",
+ new ResolvedRegularExpression(
+ Set.of("foo", "bar"),
+ 12345L,
+ context.time.milliseconds()
+ )
+ ),
+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(
+ groupId,
+ Map.of(
+ fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6),
+ barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3)
+ )
+ ),
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12)
+ ),
+ context.processTasks().get(0).result.records()
+ );
+ }
+
@Test
public void
testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() {
String groupId = "fooup";
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 42cdd9a5061..e3bed05f51e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -93,6 +93,7 @@ import
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -104,6 +105,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -451,6 +453,7 @@ public class GroupMetadataManagerTestContext {
private ShareGroupPartitionAssignor shareGroupAssignor = new
MockPartitionAssignor("share");
private final List<ShareGroupBuilder> shareGroupBuilders = new
ArrayList<>();
private final Map<String, Object> config = new HashMap<>();
+ private Optional<Authorizer> authorizer = Optional.empty();
public Builder withConfig(String key, Object value) {
config.put(key, value);
@@ -477,6 +480,11 @@ public class GroupMetadataManagerTestContext {
return this;
}
+ public Builder withAuthorizer(Authorizer authorizer) {
+ this.authorizer = Optional.of(authorizer);
+ return this;
+ }
+
public GroupMetadataManagerTestContext build() {
if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
if (groupConfigManager == null) groupConfigManager =
createConfigManager();
@@ -506,6 +514,7 @@ public class GroupMetadataManagerTestContext {
.withGroupCoordinatorMetricsShard(metrics)
.withShareGroupAssignor(shareGroupAssignor)
.withGroupConfigManager(groupConfigManager)
+ .withAuthorizer(authorizer)
.build(),
groupConfigManager
);
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
index 08db52e4e60..35ee42836c8 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -118,10 +119,12 @@ public class RegexResolutionBenchmark {
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void run() {
GroupMetadataManager.refreshRegularExpressions(
+ null,
GROUP_ID,
LOG,
TIME,
image,
+ Optional.empty(),
regexes
);
}